Merge pull request #2 from lbryio/server_upgrade
Upgrades Server to Allow Production Deployment
This commit is contained in:
commit
1986f85a30
18 changed files with 317 additions and 164 deletions
8
config/comment-server.target
Normal file
8
config/comment-server.target
Normal file
|
@ -0,0 +1,8 @@
|
|||
[Unit]
|
||||
Description="LBRY Comment Server Watchdog"
|
||||
After=network.target
|
||||
Requires=comment-server@5921.service comment-server@5922.service comment-server@5923.service comment-server@5924.service
|
||||
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
15
config/comment-server@.service
Normal file
15
config/comment-server@.service
Normal file
|
@ -0,0 +1,15 @@
|
|||
[Unit]
|
||||
Description="LBRY Comment Server #%i"
|
||||
PartOf=comment-server.target
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
User=oleg
|
||||
WorkingDirectory=/home/lbry/comment-server/
|
||||
ExecStart=/home/lbry/comment-server/venv/bin/commentserv --port %i
|
||||
Restart=on-failure
|
||||
KillMode=process
|
||||
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
51
config/nginx.conf
Normal file
51
config/nginx.conf
Normal file
|
@ -0,0 +1,51 @@
|
|||
worker_processes 1;
|
||||
daemon off;
|
||||
error_log /dev/stdout warn;
|
||||
|
||||
events {
|
||||
worker_connections 1024;
|
||||
accept_mutex off;
|
||||
}
|
||||
|
||||
http {
|
||||
|
||||
server {
|
||||
default_type application/json;
|
||||
listen 80;
|
||||
client_max_body_size 4G;
|
||||
server_name example;
|
||||
|
||||
location / {
|
||||
proxy_set_header Host $http_host;
|
||||
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||
proxy_redirect off;
|
||||
proxy_buffering off;
|
||||
|
||||
proxy_pass http://aiohttp;
|
||||
}
|
||||
}
|
||||
|
||||
# server {
|
||||
# listen 5921;
|
||||
# location /api {
|
||||
# proxy_set_header HOST $host;
|
||||
# proxy_set_header X-Forwarded-Proto $scheme;
|
||||
# proxy_set_header X-Real-IP $remote_addr;
|
||||
# proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||
#
|
||||
# proxy_redirect off;
|
||||
# proxy_pass http://localhost:2903;
|
||||
#
|
||||
# }
|
||||
#}
|
||||
}
|
||||
|
||||
http {
|
||||
upstream aiohttp {
|
||||
server 127.0.0.1:5921 fail_timeout = 60;
|
||||
server 127.0.0.1:5922 fail_timeout = 60;
|
||||
server 127.0.0.1:5923 fail_timeout = 60;
|
||||
server 127.0.0.1:5924 fail_timeout = 60;
|
||||
|
||||
}
|
||||
}
|
|
@ -1,10 +0,0 @@
|
|||
Faker>=1.0.7
|
||||
asyncio>=3.4.3
|
||||
aiohttp==3.5.4
|
||||
aiojobs==0.2.2
|
||||
ecdsa==0.13
|
||||
cryptography==2.5
|
||||
aiosqlite==0.10.0
|
||||
PyNaCl>=1.3.0
|
||||
requests
|
||||
cython
|
28
setup.py
Normal file
28
setup.py
Normal file
|
@ -0,0 +1,28 @@
|
|||
import os
|
||||
from setuptools import setup, find_packages
|
||||
|
||||
ROOT = os.path.dirname(__name__)
|
||||
|
||||
setup(
|
||||
name='CommentServer',
|
||||
version='0.0.1',
|
||||
packages=find_packages(exclude=('tests',)),
|
||||
entry_points={
|
||||
'console_scripts': 'commentserv=src.main:main'
|
||||
},
|
||||
zip_safe=False,
|
||||
data_files=[('config', ['config/conf.json',])],
|
||||
include_package_data=True,
|
||||
install_requires=[
|
||||
'Faker>=1.0.7',
|
||||
'asyncio>=3.4.3',
|
||||
'aiohttp==3.5.4',
|
||||
'aiojobs==0.2.2',
|
||||
'ecdsa==0.13',
|
||||
'cryptography==2.5',
|
||||
'aiosqlite==0.10.0',
|
||||
'PyNaCl>=1.3.0',
|
||||
'requests',
|
||||
'cython', # Not really needed anymore but w/e
|
||||
]
|
||||
)
|
97
src/app.py
97
src/app.py
|
@ -1,97 +0,0 @@
|
|||
# cython: language_level=3
|
||||
import logging
|
||||
import pathlib
|
||||
import re
|
||||
import time
|
||||
|
||||
import aiojobs
|
||||
import aiojobs.aiohttp
|
||||
import asyncio
|
||||
from aiohttp import web
|
||||
|
||||
from src.schema.db_helpers import setup_database, backup_database
|
||||
from src.database import obtain_connection, DatabaseWriter
|
||||
from src.handles import api_endpoint, get_api_endpoint
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def setup_db_schema(app):
|
||||
if not pathlib.Path(app['db_path']).exists():
|
||||
logger.info('Setting up schema in %s', app['db_path'])
|
||||
setup_database(app['db_path'], app['config']['PATH']['SCHEMA'])
|
||||
else:
|
||||
logger.info(f'Database already exists in {app["db_path"]}, skipping setup')
|
||||
|
||||
|
||||
async def close_comment_scheduler(app):
|
||||
logger.info('Closing comment_scheduler')
|
||||
await app['comment_scheduler'].close()
|
||||
|
||||
|
||||
async def database_backup_routine(app):
|
||||
try:
|
||||
while True:
|
||||
await asyncio.sleep(app['config']['BACKUP_INT'])
|
||||
with app['reader'] as conn:
|
||||
logger.debug('backing up database')
|
||||
backup_database(conn, app['backup'])
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
|
||||
# noinspection PyDeprecation
|
||||
async def start_background_tasks(app: web.Application):
|
||||
app['reader'] = obtain_connection(app['db_path'], True)
|
||||
app['waitful_backup'] = app.loop.create_task(database_backup_routine(app))
|
||||
app['comment_scheduler'] = await aiojobs.create_scheduler(limit=1, pending_limit=0)
|
||||
app['db_writer'] = DatabaseWriter(app['db_path'])
|
||||
app['writer'] = app['db_writer'].connection
|
||||
|
||||
|
||||
def insert_to_config(app, conf=None, db_file=None):
|
||||
db_file = db_file if db_file else 'DEFAULT'
|
||||
app['config'] = conf
|
||||
app['db_path'] = conf['PATH'][db_file]
|
||||
app['backup'] = re.sub(r'\.db$', '.backup.db', app['db_path'])
|
||||
assert app['db_path'] != app['backup']
|
||||
|
||||
|
||||
async def cleanup_background_tasks(app):
|
||||
logger.info('Ending background backup loop')
|
||||
app['waitful_backup'].cancel()
|
||||
await app['waitful_backup']
|
||||
app['reader'].close()
|
||||
app['writer'].close()
|
||||
|
||||
|
||||
def create_app(conf, db_path='DEFAULT', **kwargs):
|
||||
app = web.Application()
|
||||
app['START_TIME'] = int(time.time())
|
||||
insert_to_config(app, conf, db_path)
|
||||
app.on_startup.append(setup_db_schema)
|
||||
app.on_startup.append(start_background_tasks)
|
||||
app.on_shutdown.append(cleanup_background_tasks)
|
||||
app.on_shutdown.append(close_comment_scheduler)
|
||||
aiojobs.aiohttp.setup(app, **kwargs)
|
||||
app.add_routes([
|
||||
web.post('/api', api_endpoint),
|
||||
web.get('/', get_api_endpoint),
|
||||
web.get('/api', get_api_endpoint)
|
||||
])
|
||||
return app
|
||||
|
||||
|
||||
def run_app(config):
|
||||
appl = create_app(conf=config, db_path='DEFAULT', close_timeout=5.0)
|
||||
try:
|
||||
asyncio.run(web.run_app(
|
||||
app=appl,
|
||||
access_log=logging.getLogger('aiohttp.access'),
|
||||
host=config['HOST'],
|
||||
port=config['PORT']
|
||||
))
|
||||
except asyncio.CancelledError:
|
||||
logging.warning('Server going down, asyncio loop raised cancelled error:')
|
||||
except ValueError:
|
||||
logging.exception('Server going down due to value error:')
|
|
@ -1,9 +1,10 @@
|
|||
import logging.config
|
||||
import logging
|
||||
import os
|
||||
from src.settings import config
|
||||
import argparse
|
||||
import sys
|
||||
|
||||
from src.app import run_app
|
||||
from src.settings import config
|
||||
from src.server.app import run_app
|
||||
|
||||
|
||||
def config_logging_from_settings(conf):
|
||||
|
@ -69,7 +70,16 @@ def config_logging_from_settings(conf):
|
|||
logging.config.dictConfig(_config)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
def main(argv=None):
|
||||
argv = argv or sys.argv[1:]
|
||||
parser = argparse.ArgumentParser(description='LBRY Comment Server')
|
||||
parser.add_argument('--port', type=int)
|
||||
args = parser.parse_args(argv)
|
||||
if args.port:
|
||||
config['PORT'] = args.port
|
||||
config_logging_from_settings(config)
|
||||
logger = logging.getLogger(__name__)
|
||||
run_app(config)
|
||||
run_app(config)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main())
|
0
src/server/__init__.py
Normal file
0
src/server/__init__.py
Normal file
122
src/server/app.py
Normal file
122
src/server/app.py
Normal file
|
@ -0,0 +1,122 @@
|
|||
# cython: language_level=3
|
||||
import logging
|
||||
import pathlib
|
||||
import re
|
||||
import signal
|
||||
import time
|
||||
|
||||
import aiojobs
|
||||
import aiojobs.aiohttp
|
||||
import asyncio
|
||||
from aiohttp import web
|
||||
|
||||
from src.schema.db_helpers import setup_database, backup_database
|
||||
from src.server.database import obtain_connection, DatabaseWriter
|
||||
from src.server.handles import api_endpoint, get_api_endpoint
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def setup_db_schema(app):
|
||||
if not pathlib.Path(app['db_path']).exists():
|
||||
logger.info('Setting up schema in %s', app['db_path'])
|
||||
setup_database(app['db_path'], app['config']['PATH']['SCHEMA'])
|
||||
else:
|
||||
logger.info(f'Database already exists in {app["db_path"]}, skipping setup')
|
||||
|
||||
|
||||
async def close_comment_scheduler(app):
|
||||
logger.info('Closing comment_scheduler')
|
||||
await app['comment_scheduler'].close()
|
||||
|
||||
|
||||
async def database_backup_routine(app):
|
||||
try:
|
||||
while True:
|
||||
await asyncio.sleep(app['config']['BACKUP_INT'])
|
||||
with app['reader'] as conn:
|
||||
logger.debug('backing up database')
|
||||
backup_database(conn, app['backup'])
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
|
||||
async def start_background_tasks(app):
|
||||
app['reader'] = obtain_connection(app['db_path'], True)
|
||||
app['waitful_backup'] = app.loop.create_task(database_backup_routine(app))
|
||||
app['comment_scheduler'] = await aiojobs.create_scheduler(limit=1, pending_limit=0)
|
||||
app['db_writer'] = DatabaseWriter(app['db_path'])
|
||||
app['writer'] = app['db_writer'].connection
|
||||
|
||||
|
||||
async def stop_background_tasks(app):
|
||||
logger.info('Ending background backup loop')
|
||||
app['waitful_backup'].cancel()
|
||||
await app['waitful_backup']
|
||||
app['reader'].close()
|
||||
app['writer'].close()
|
||||
|
||||
|
||||
class CommentDaemon:
|
||||
def __init__(self, config, db_path=None, **kwargs):
|
||||
self.config = config
|
||||
app = web.Application()
|
||||
self.insert_to_config(app, config, db_file=db_path)
|
||||
app.on_startup.append(setup_db_schema)
|
||||
app.on_startup.append(start_background_tasks)
|
||||
app.on_shutdown.append(stop_background_tasks)
|
||||
app.on_shutdown.append(close_comment_scheduler)
|
||||
aiojobs.aiohttp.setup(app, **kwargs)
|
||||
app.add_routes([
|
||||
web.post('/api', api_endpoint),
|
||||
web.get('/', get_api_endpoint),
|
||||
web.get('/api', get_api_endpoint)
|
||||
])
|
||||
self.app = app
|
||||
self.app_runner = web.AppRunner(app)
|
||||
self.app_site = None
|
||||
|
||||
async def start(self):
|
||||
self.app['START_TIME'] = time.time()
|
||||
await self.app_runner.setup()
|
||||
self.app_site = web.TCPSite(
|
||||
runner=self.app_runner,
|
||||
host=self.config['HOST'],
|
||||
port=self.config['PORT'],
|
||||
)
|
||||
await self.app_site.start()
|
||||
logger.info(f'Comment Server is running on {self.config["HOST"]}:{self.config["PORT"]}')
|
||||
|
||||
async def stop(self):
|
||||
await self.app.shutdown()
|
||||
await self.app.cleanup()
|
||||
await self.app_runner.cleanup()
|
||||
|
||||
@staticmethod
|
||||
def insert_to_config(app, conf=None, db_file=None):
|
||||
db_file = db_file if db_file else 'DEFAULT'
|
||||
app['config'] = conf
|
||||
app['db_path'] = conf['PATH'][db_file]
|
||||
app['backup'] = re.sub(r'\.db$', '.backup.db', app['db_path'])
|
||||
assert app['db_path'] != app['backup']
|
||||
|
||||
|
||||
def run_app(config):
|
||||
comment_app = CommentDaemon(config=config, db_path='DEFAULT', close_timeout=5.0)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.set_debug(True)
|
||||
|
||||
def __exit():
|
||||
raise web.GracefulExit()
|
||||
|
||||
loop.add_signal_handler(signal.SIGINT, __exit)
|
||||
loop.add_signal_handler(signal.SIGTERM, __exit)
|
||||
|
||||
try:
|
||||
loop.run_until_complete(comment_app.start())
|
||||
loop.run_forever()
|
||||
except (web.GracefulExit, KeyboardInterrupt, asyncio.CancelledError, ValueError):
|
||||
logging.warning('Server going down, asyncio loop raised cancelled error:')
|
||||
finally:
|
||||
loop.run_until_complete(comment_app.stop())
|
|
@ -6,15 +6,15 @@ import asyncio
|
|||
from aiohttp import web
|
||||
from aiojobs.aiohttp import atomic
|
||||
|
||||
from src.misc import clean_input_params
|
||||
from src.database import get_claim_comments
|
||||
from src.database import get_comments_by_id, get_comment_ids
|
||||
from src.database import get_channel_id_from_comment_id
|
||||
from src.database import obtain_connection
|
||||
from src.misc import is_valid_base_comment
|
||||
from src.misc import is_valid_credential_input
|
||||
from src.misc import make_error
|
||||
from src.writes import delete_comment_if_authorized, write_comment
|
||||
from src.server.misc import clean_input_params
|
||||
from src.server.database import get_claim_comments
|
||||
from src.server.database import get_comments_by_id, get_comment_ids
|
||||
from src.server.database import get_channel_id_from_comment_id
|
||||
from src.server.database import obtain_connection
|
||||
from src.server.misc import is_valid_base_comment
|
||||
from src.server.misc import is_valid_credential_input
|
||||
from src.server.misc import make_error
|
||||
from src.server.writes import delete_comment_if_authorized, write_comment
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -25,22 +25,22 @@ def ping(*args):
|
|||
|
||||
|
||||
def handle_get_channel_from_comment_id(app, kwargs: dict):
|
||||
with obtain_connection(app['db_path']) as conn:
|
||||
with app['reader'] as conn:
|
||||
return get_channel_id_from_comment_id(conn, **kwargs)
|
||||
|
||||
|
||||
def handle_get_comment_ids(app, kwargs):
|
||||
with obtain_connection(app['db_path']) as conn:
|
||||
with app['reader'] as conn:
|
||||
return get_comment_ids(conn, **kwargs)
|
||||
|
||||
|
||||
def handle_get_claim_comments(app, kwargs):
|
||||
with obtain_connection(app['db_path']) as conn:
|
||||
with app['reader'] as conn:
|
||||
return get_claim_comments(conn, **kwargs)
|
||||
|
||||
|
||||
def handle_get_comments_by_id(app, kwargs):
|
||||
with obtain_connection(app['db_path']) as conn:
|
||||
with app['reader'] as conn:
|
||||
return get_comments_by_id(conn, **kwargs)
|
||||
|
||||
|
|
@ -3,13 +3,13 @@ import sqlite3
|
|||
|
||||
from asyncio import coroutine
|
||||
|
||||
from src.database import delete_comment_by_id
|
||||
from src.misc import is_authentic_delete_signal
|
||||
from src.server.database import delete_comment_by_id
|
||||
from src.server.misc import is_authentic_delete_signal
|
||||
|
||||
from src.database import get_comment_or_none
|
||||
from src.database import insert_comment
|
||||
from src.database import insert_channel
|
||||
from src.misc import channel_matches_pattern_or_error
|
||||
from src.server.database import get_comment_or_none
|
||||
from src.server.database import insert_comment
|
||||
from src.server.database import insert_channel
|
||||
from src.server.misc import channel_matches_pattern_or_error
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -5,10 +5,10 @@ from faker.providers import internet
|
|||
from faker.providers import lorem
|
||||
from faker.providers import misc
|
||||
|
||||
from src.database import get_comments_by_id
|
||||
from src.database import get_comment_ids
|
||||
from src.database import get_claim_comments
|
||||
from src.writes import create_comment_or_error
|
||||
from server.database import get_comments_by_id
|
||||
from server.database import get_comment_ids
|
||||
from server.database import get_claim_comments
|
||||
from server.writes import create_comment_or_error
|
||||
from tests.testcase import DatabaseTestCase
|
||||
|
||||
fake = faker.Faker()
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
# * 'gtrp' and 'gtr' create a GET request with or without query parameters;
|
||||
# * 'ptr' and 'ptrp' create a POST request with a simple or parameter-like body;
|
||||
# * 'mptr' and 'fptr' create a POST request to submit a form with a text or file field (multipart/form-data);
|
||||
POST http://localhost:5921/api
|
||||
POST http://localhost:5922/api
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
|
@ -12,12 +12,12 @@ Content-Type: application/json
|
|||
"id": null,
|
||||
"method": "create_comment",
|
||||
"params": {
|
||||
"claim_id": "6d266af6c25c80fa2ac6cc7662921ad2e90a07e8",
|
||||
"claim_id": "6d266af6c25c80fa2ac6cc7662921ad2e90a07e7",
|
||||
"comment": "This is literally, the most anonymous comment I have EVER seen.",
|
||||
"channel_id": "9cb713f01bf247a0e03170b5ed00d5161340c486",
|
||||
"channel_name": "@bbbbb",
|
||||
"signature": "e8ad5dbc268365e5d16f6a9b5d82323efc550412a7a291aa3e715e31641cc57b18241a132ebed28a848086fca6e200a735281c631c7474b01b4c32851ea3dd57",
|
||||
"signing_ts": "1561669614"
|
||||
"signature": "e8ad5dbd660365efd1646a9b5d82323efc550712a7a291aa3e715e31641cc57b18241a132ebed28a848086fca6e200a735281c631c7474b01b4c32851ea3dd57",
|
||||
"signing_ts": "1561669624"
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
# * 'gtrp' and 'gtr' create a GET request with or without query parameters;
|
||||
# * 'ptr' and 'ptrp' create a POST request with a simple or parameter-like body;
|
||||
# * 'mptr' and 'fptr' create a POST request to submit a form with a text or file field (multipart/form-data);
|
||||
POST http://localhost:5921/api
|
||||
POST http://localhost:5922/api
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
|
@ -12,8 +12,9 @@ Content-Type: application/json
|
|||
"id": null,
|
||||
"method": "get_claim_comments",
|
||||
"params": {
|
||||
"claim_id": "6d266af6c25c80fa2ac6cc7662921ad2e90a07e8",
|
||||
"page": 1
|
||||
"claim_id": "6d266af6c25c80fa2ac6cc7662921ad2e90a07e7",
|
||||
"page": 1,
|
||||
"page_size": 100
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
import unittest
|
||||
from multiprocessing.pool import Pool
|
||||
|
||||
import requests
|
||||
import re
|
||||
from itertools import *
|
||||
|
@ -8,7 +10,7 @@ from faker.providers import internet
|
|||
from faker.providers import lorem
|
||||
from faker.providers import misc
|
||||
|
||||
from src.settings import config
|
||||
from settings import config
|
||||
|
||||
fake = faker.Faker()
|
||||
fake.add_provider(internet)
|
||||
|
@ -34,6 +36,22 @@ def nothing():
|
|||
pass
|
||||
|
||||
|
||||
replace = {
|
||||
'claim_id': fake.sha1,
|
||||
'comment': fake.text,
|
||||
'channel_id': fake.sha1,
|
||||
'channel_name': fake_lbryusername,
|
||||
'signature': fake.uuid4,
|
||||
'parent_id': fake.sha256
|
||||
}
|
||||
|
||||
|
||||
def create_test_comments(values: iter, **default):
|
||||
vars_combo = chain.from_iterable(combinations(values, r) for r in range(1, len(values) + 1))
|
||||
return [{k: replace[k]() if k in comb else v for k, v in default.items()}
|
||||
for comb in vars_combo]
|
||||
|
||||
|
||||
class ServerTest(unittest.TestCase):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
@ -216,23 +234,30 @@ class ListCommentsTest(unittest.TestCase):
|
|||
self.assertEqual(response['total_pages'], response_one['total_pages'])
|
||||
|
||||
|
||||
class ConcurrentWriteTest(unittest.TestCase):
|
||||
@staticmethod
|
||||
def make_comment(num):
|
||||
return {
|
||||
'jsonrpc': '2.0',
|
||||
'id': num,
|
||||
'method': 'create_comment',
|
||||
'params': {
|
||||
'comment': f'Comment #{num}',
|
||||
'claim_id': '6d266af6c25c80fa2ac6cc7662921ad2e90a07e7',
|
||||
}
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def send_comment_to_server(params):
|
||||
with requests.post(params[0], json=params[1]) as req:
|
||||
return req.json()
|
||||
|
||||
|
||||
replace = {
|
||||
'claim_id': fake.sha1,
|
||||
'comment': fake.text,
|
||||
'channel_id': fake.sha1,
|
||||
'channel_name': fake_lbryusername,
|
||||
'signature': fake.uuid4,
|
||||
'parent_id': fake.sha256
|
||||
}
|
||||
|
||||
|
||||
def create_test_comments(values: iter, **default):
|
||||
vars_combo = chain.from_iterable(combinations(values, r) for r in range(1, len(values) + 1))
|
||||
return [{k: replace[k]() if k in comb else v for k, v in default.items()}
|
||||
for comb in vars_combo]
|
||||
|
||||
|
||||
|
||||
def test01Concurrency(self):
|
||||
urls = [f'http://localhost:{port}/api' for port in range(5921, 5925)]
|
||||
comments = [self.make_comment(i) for i in range(1, 5)]
|
||||
inputs = list(zip(urls, comments))
|
||||
with Pool(4) as pool:
|
||||
results = pool.map(self.send_comment_to_server, inputs)
|
||||
results = list(filter(lambda x: 'comment_id' in x['result'], results))
|
||||
self.assertIsNotNone(results)
|
||||
self.assertEqual(len(results), len(inputs))
|
||||
|
|
|
@ -6,8 +6,8 @@ from unittest.case import _Outcome
|
|||
import asyncio
|
||||
|
||||
from schema.db_helpers import setup_database, teardown_database
|
||||
from src.database import obtain_connection
|
||||
from src.settings import config
|
||||
from server.database import obtain_connection
|
||||
from settings import config
|
||||
|
||||
|
||||
class AsyncioTestCase(unittest.TestCase):
|
||||
|
|
Loading…
Add table
Reference in a new issue