Moves async code around

This commit is contained in:
Oleg Silkin 2019-05-26 00:42:39 -04:00
parent 37888db901
commit fdf385acc5
6 changed files with 23 additions and 76 deletions

View file

@ -1,19 +1,17 @@
from distutils.core import setup from distutils.core import setup
from distutils.extension import Extension from distutils.extension import Extension
from Cython.Distutils import build_ext from Cython.Distutils import build_ext
from Cython.Build import cythonize
ext_modules = [ ext_modules = [
Extension("src.database", ["src/database.py"]), Extension("src.database", ["src/database.py"]),
Extension("src.settings", ["src/settings.py"]), Extension("src.settings", ["src/settings.py"]),
Extension("src.writes", ["src/writes.py"]), Extension("src.writes", ["src/writes.py"]),
Extension("src.handles", ["src/handles.py"]),
Extension("schema.db_helpers", ["schema/db_helpers.py"]), Extension("schema.db_helpers", ["schema/db_helpers.py"]),
Extension("src.app", ["src/app.py"]) ] # might need to add some external imports here too
] # might need to add some external imports here too
setup( setup(
name="comment_server", name="comment_server",
cmdclass={"build_ext": build_ext}, cmdclass={"build_ext": build_ext},
ext_modules=ext_modules, ext_modules=cythonize(ext_modules, compiler_directives={'language_level': '3'})
compiler_directives={'language_level': '3'}
) )

View file

@ -1,3 +1,4 @@
# cython: language_level=3
import logging import logging
import aiojobs.aiohttp import aiojobs.aiohttp
@ -8,8 +9,9 @@ import re
import schema.db_helpers import schema.db_helpers
from src.database import obtain_connection from src.database import obtain_connection
from src.handles import api_endpoint from src.handles import api_endpoint
from src.handles import create_comment_scheduler
from src.settings import config from src.settings import config
from src.writes import create_comment_scheduler, DatabaseWriter from src.writes import DatabaseWriter
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View file

@ -1,10 +1,10 @@
# cython: language_level=3
import logging import logging
import re import re
import sqlite3 import sqlite3
import time import time
import typing import typing
import aiosqlite
import nacl.hash import nacl.hash
from src.settings import config from src.settings import config
@ -152,61 +152,6 @@ def get_comments_by_id(conn, comment_ids: list) -> typing.Union[list, None]:
)] )]
async def _insert_channel_async(db_file: str, channel_name: str, channel_id: str):
async with aiosqlite.connect(db_file) as db:
await db.execute('INSERT INTO CHANNEL(ClaimId, Name) VALUES (?, ?)',
(channel_id, channel_name))
await db.commit()
async def _insert_comment_async(db_file: str, claim_id: str = None, comment: str = None,
channel_id: str = None, signature: str = None, parent_id: str = None) -> str:
timestamp = time.time_ns()
comment_prehash = ':'.join((claim_id, comment, str(timestamp),))
comment_prehash = bytes(comment_prehash.encode('utf-8'))
comment_id = nacl.hash.sha256(comment_prehash).decode('utf-8')
async with aiosqlite.connect(db_file) as db:
await db.execute(
"""
INSERT INTO COMMENT(CommentId, LbryClaimId, ChannelId, Body,
ParentId, Signature, Timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(comment_id, claim_id, channel_id, comment, parent_id, signature, timestamp)
)
await db.commit()
return comment_id
async def create_comment_async(db_file: str, comment: str, claim_id: str, **kwargs):
channel_id = kwargs.pop('channel_id', '')
channel_name = kwargs.pop('channel_name', '')
if channel_id or channel_name:
try:
validate_input(
comment=comment,
claim_id=claim_id,
channel_id=channel_id,
channel_name=channel_name,
)
await _insert_channel_async(db_file, channel_name, channel_id)
except AssertionError:
raise TypeError('Invalid parameters given to input validation')
else:
channel_id = config['ANONYMOUS']['CHANNEL_ID']
comment_id = await _insert_comment_async(
db_file=db_file, comment=comment, claim_id=claim_id, channel_id=channel_id, **kwargs
)
async with aiosqlite.connect(db_file) as db:
db.row_factory = aiosqlite.Row
curs = await db.execute(
'SELECT * FROM COMMENTS_ON_CLAIMS WHERE comment_id = ?', (comment_id,)
)
thing = await curs.fetchone()
await curs.close()
return dict(thing) if thing else None
if __name__ == '__main__': if __name__ == '__main__':
pass pass
# __generate_database_schema(connection, 'comments_ddl.sql') # __generate_database_schema(connection, 'comments_ddl.sql')

View file

@ -1,14 +1,18 @@
# cython: language_level=3
import json import json
import logging import logging
import asyncio import asyncio
import aiojobs
from asyncio import coroutine
from aiohttp import web from aiohttp import web
from aiojobs.aiohttp import atomic from aiojobs.aiohttp import atomic
from src.writes import write_comment from src.writes import DatabaseWriter
from src.database import get_claim_comments from src.database import get_claim_comments
from src.database import get_comments_by_id, get_comment_ids from src.database import get_comments_by_id, get_comment_ids
from src.database import obtain_connection from src.database import obtain_connection
from src.database import create_comment
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -38,6 +42,15 @@ def handle_get_comments_by_id(app, **kwargs):
return get_comments_by_id(conn, **kwargs) return get_comments_by_id(conn, **kwargs)
async def create_comment_scheduler():
return await aiojobs.create_scheduler(limit=1, pending_limit=0)
async def write_comment(**comment):
with DatabaseWriter._writer.connection as conn:
return await coroutine(create_comment)(conn, **comment)
async def handle_create_comment(scheduler, **kwargs): async def handle_create_comment(scheduler, **kwargs):
job = await scheduler.spawn(write_comment(**kwargs)) job = await scheduler.spawn(write_comment(**kwargs))
return await job.wait() return await job.wait()

View file

@ -1,3 +1,4 @@
# cython: language_level=3
import json import json
import logging import logging
import pathlib import pathlib

View file

@ -1,10 +1,7 @@
import atexit import atexit
import logging import logging
import aiojobs from src.database import obtain_connection
from asyncio import coroutine
from src.database import obtain_connection, create_comment
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -31,12 +28,3 @@ class DatabaseWriter(object):
@property @property
def connection(self): def connection(self):
return self.conn return self.conn
async def create_comment_scheduler():
return await aiojobs.create_scheduler(limit=1, pending_limit=0)
async def write_comment(**comment):
with DatabaseWriter._writer.connection as conn:
return await coroutine(create_comment)(conn, **comment)