Better validation logic;

This commit is contained in:
Oleg Silkin 2020-01-03 18:16:19 -05:00 committed by Lex Berezhny
parent b31881f424
commit ebf2e7ee70

View file

@ -47,10 +47,6 @@ class MockedCommentServer:
schema.update(**kwargs) schema.update(**kwargs)
return schema return schema
@staticmethod
def clean(d: dict):
return {k: v for k, v in d.items() if v or isinstance(v, bool)}
def create_comment(self, channel_name=None, channel_id=None, **kwargs): def create_comment(self, channel_name=None, channel_id=None, **kwargs):
comment_id = self.comment_id comment_id = self.comment_id
channel_url = 'lbry://' + channel_name + '#' + channel_id if channel_id else None channel_url = 'lbry://' + channel_name + '#' + channel_id if channel_id else None
@ -68,8 +64,9 @@ class MockedCommentServer:
def abandon_comment(self, comment_id: int, channel_id: str, **kwargs): def abandon_comment(self, comment_id: int, channel_id: str, **kwargs):
deleted = False deleted = False
comment_id = self.get_comment_id(comment_id)
try: try:
if 0 <= comment_id < len(self.comments) and self.comments[comment_id]['channel_id'] == channel_id: if self.comments[comment_id]['channel_id'] == channel_id:
self.comments.pop(comment_id) self.comments.pop(comment_id)
deleted = True deleted = True
finally: finally:
@ -79,6 +76,130 @@ class MockedCommentServer:
} }
} }
def edit_comment(self, comment_id: typing.Union[str, int], comment: str, channel_id: str,
channel_name: str, signature: str, signing_ts: str) -> dict:
edited = False
if self.credentials_are_valid(channel_id, channel_name, signature, signing_ts) \
and self.is_valid_body(comment):
comment_id = self.get_comment_id(comment_id)
if self.comments[comment_id]['channel_id'] == channel_id:
self.comments[comment_id].update({
'comment': comment,
'signature': signature,
'signing_ts': signing_ts
})
edited = True
return self.comments[comment_id] if edited else None
def hide_comment(self, comment_id: typing.Union[int, str], signing_ts: str, signature: str):
comment_id = self.get_comment_id(comment_id)
if self.is_signable(signature, signing_ts):
self.comments[comment_id]['is_hidden'] = True
return True
return False
def hide_comments(self, pieces: list):
comments_hidden = []
for p in pieces:
if self.hide_comment(**p):
comments_hidden.append(p['comment_id'])
return {'hidden': comments_hidden}
def get_claim_comments(self, claim_id, page=1, page_size=50,**kwargs):
comments = list(filter(lambda c: c['claim_id'] == claim_id, self.comments))
return {
'page': page,
'page_size': page_size,
'total_pages': ceil(len(comments)/page_size),
'total_items': len(comments),
'items': [self.clean(c) for c in (comments[::-1])[(page - 1) * page_size: page * page_size]],
'has_hidden_comments': bool(list(filter(lambda x: x['is_hidden'], comments)))
}
def get_claim_hidden_comments(self, claim_id, hidden=True, page=1, page_size=50):
comments = list(filter(lambda c: c['claim_id'] == claim_id, self.comments))
select_comments = list(filter(lambda c: c['is_hidden'] == hidden, comments))
return {
'page': page,
'page_size': page_size,
'total_pages': ceil(len(select_comments) / page_size),
'total_items': len(select_comments),
'items': [self.clean(c) for c in (select_comments[::-1])[(page - 1) * page_size: page * page_size]],
'has_hidden_comments': bool(list(filter(lambda c: c['is_hidden'], comments)))
}
def get_comment_channel_by_id(self, comment_id: int, **kwargs):
comment = self.comments[self.get_comment_id(comment_id)]
return {
'channel_id': comment.get('channel_id'),
'channel_name': comment.get('channel_name')
}
def get_comments_by_id(self, comment_ids: list):
return [self.comments[self.get_comment_id(cid)] for cid in comment_ids]
methods = {
'get_claim_comments': get_claim_comments,
'get_comments_by_id': get_comments_by_id,
'create_comment': create_comment,
'abandon_comment': abandon_comment,
'get_channel_from_comment_id': get_comment_channel_by_id,
'get_claim_hidden_comments': get_claim_hidden_comments,
'hide_comments': hide_comments,
'edit_comment': edit_comment,
}
def process_json(self, body) -> dict:
response = {'jsonrpc': '2.0', 'id': body['id']}
error = None
try:
if body['method'] in self.methods:
params: dict = body.get('params', {})
result = self.methods[body['method']](self, **params)
response['result'] = result
else:
response['error'] = self.ERRORS['INVALID_METHOD']
except (ValueError, TypeError) as err:
error = err
response['error'] = self.ERRORS['INVALID_PARAMS']
except Exception as err:
error = err
response['error'] = self.ERRORS['UNKNOWN']
finally:
if 'error' in response:
response['error'].update({'exception': f'{type(error).__name__}: {error}'})
return response
async def start(self):
self.runner = web.AppRunner(self.app)
await self.runner.setup()
self.server = web.TCPSite(self.runner, 'localhost', self.port)
await self.server.start()
async def stop(self):
await self.runner.shutdown()
await self.runner.cleanup()
async def api(self, request):
body = await request.json()
if type(body) is list or type(body) is dict:
if type(body) is list:
response = [self.process_json(part) for part in body]
else:
response = self.process_json(body)
return web.json_response(response)
else:
raise TypeError('invalid type passed')
@staticmethod
def clean(d: dict):
return {k: v for k, v in d.items() if v or isinstance(v, bool)}
@staticmethod @staticmethod
def is_valid_body(comment) -> bool: def is_valid_body(comment) -> bool:
return 0 < len(comment) <= 2000 return 0 < len(comment) <= 2000
@ -91,6 +212,11 @@ class MockedCommentServer:
return 0 <= comment_id < len(self.comments) return 0 <= comment_id < len(self.comments)
return False return False
def get_comment_id(self, cid: typing.Union[int, str, any]) -> int:
if not self.is_valid_comment_id(cid):
raise ValueError('Comment ID is Invalid')
return cid if isinstance(cid, int) else int(cid)
@staticmethod @staticmethod
def claim_id_is_valid(claim_id: str) -> bool: def claim_id_is_valid(claim_id: str) -> bool:
return re.fullmatch('([a-z0-9]{40}|[A-Z0-9]{40})', claim_id) is not None return re.fullmatch('([a-z0-9]{40}|[A-Z0-9]{40})', claim_id) is not None
@ -131,108 +257,6 @@ class MockedCommentServer:
claim_id is not None and self.claim_id_is_valid(claim_id) and \ claim_id is not None and self.claim_id_is_valid(claim_id) and \
(parent_id is None or self.is_valid_comment_id(parent_id)) (parent_id is None or self.is_valid_comment_id(parent_id))
def edit_comment(self, comment_id: str, comment: str, channel_id: str,
channel_name: str, signature: str, signing_ts: str) -> dict:
pass
def hide_comment(self, comment_id: typing.Union[int, str], signing_ts: str, signature: str):
comment_id = int(comment_id) if not isinstance(comment_id, int) else comment_id
if self.is_valid_comment_id(comment_id) and self.is_signable(signature, signing_ts):
self.comments[comment_id]['is_hidden'] = True
return True
return False
def hide_comments(self, pieces: list):
comments_hidden = []
for p in pieces:
if self.hide_comment(**p):
comments_hidden.append(p['comment_id'])
return {'hidden': comments_hidden}
def get_claim_comments(self, claim_id, page=1, page_size=50,**kwargs):
comments = list(filter(lambda c: c['claim_id'] == claim_id, self.comments))
return {
'page': page,
'page_size': page_size,
'total_pages': ceil(len(comments)/page_size),
'total_items': len(comments),
'items': [self.clean(c) for c in (comments[::-1])[(page - 1) * page_size: page * page_size]],
'has_hidden_comments': bool(list(filter(lambda x: x['is_hidden'], comments)))
}
def get_claim_hidden_comments(self, claim_id, hidden=True, page=1, page_size=50):
comments = list(filter(lambda c: c['claim_id'] == claim_id, self.comments))
select_comments = list(filter(lambda c: c['is_hidden'] == hidden, comments))
return {
'page': page,
'page_size': page_size,
'total_pages': ceil(len(select_comments) / page_size),
'total_items': len(select_comments),
'items': [self.clean(c) for c in (select_comments[::-1])[(page - 1) * page_size: page * page_size]],
'has_hidden_comments': bool(list(filter(lambda c: c['is_hidden'], comments)))
}
def get_comment_channel_by_id(self, comment_id: int, **kwargs):
comment = self.comments[comment_id]
return {
'channel_id': comment.get('channel_id'),
'channel_name': comment.get('channel_name')
}
def get_comments_by_id(self, comment_ids: list):
comment_ids = [int(c) if not isinstance(c, int) else c for c in comment_ids]
comments = [self.comments[cmnt_id] for cmnt_id in comment_ids if 0 <= cmnt_id < len(self.comments)]
return comments
methods = {
'get_claim_comments': get_claim_comments,
'get_comments_by_id': get_comments_by_id,
'create_comment': create_comment,
'abandon_comment': abandon_comment,
'get_channel_from_comment_id': get_comment_channel_by_id,
'get_claim_hidden_comments': get_claim_hidden_comments,
'hide_comments': hide_comments,
}
def process_json(self, body) -> dict:
response = {'jsonrpc': '2.0', 'id': body['id']}
try:
if body['method'] in self.methods:
params: dict = body.get('params', {})
comment_id = params.get('comment_id')
if comment_id and not isinstance(comment_id, int):
params['comment_id'] = int(comment_id)
result = self.methods[body['method']](self, **params)
response['result'] = result
else:
response['error'] = self.ERRORS['INVALID_METHOD']
except Exception as err:
response['error'] = self.ERRORS['UNKNOWN']
response['error'].update({'exception': f'{type(err).__name__}: {err}'})
return response
async def start(self):
self.runner = web.AppRunner(self.app)
await self.runner.setup()
self.server = web.TCPSite(self.runner, 'localhost', self.port)
await self.server.start()
async def stop(self):
await self.runner.shutdown()
await self.runner.cleanup()
async def api(self, request):
body = await request.json()
if type(body) is list or type(body) is dict:
if type(body) is list:
response = [self.process_json(part) for part in body]
else:
response = self.process_json(body)
return web.json_response(response)
else:
raise TypeError('invalid type passed')
class CommentCommands(CommandTestCase): class CommentCommands(CommandTestCase):