diff --git a/tests/integration/other/test_comment_commands.py b/tests/integration/other/test_comment_commands.py index 4524fa827..66c2aa944 100644 --- a/tests/integration/other/test_comment_commands.py +++ b/tests/integration/other/test_comment_commands.py @@ -47,10 +47,6 @@ class MockedCommentServer: schema.update(**kwargs) return schema - @staticmethod - def clean(d: dict): - return {k: v for k, v in d.items() if v or isinstance(v, bool)} - def create_comment(self, channel_name=None, channel_id=None, **kwargs): comment_id = self.comment_id channel_url = 'lbry://' + channel_name + '#' + channel_id if channel_id else None @@ -68,8 +64,9 @@ class MockedCommentServer: def abandon_comment(self, comment_id: int, channel_id: str, **kwargs): deleted = False + comment_id = self.get_comment_id(comment_id) try: - if 0 <= comment_id < len(self.comments) and self.comments[comment_id]['channel_id'] == channel_id: + if self.comments[comment_id]['channel_id'] == channel_id: self.comments.pop(comment_id) deleted = True finally: @@ -79,6 +76,130 @@ class MockedCommentServer: } } + def edit_comment(self, comment_id: typing.Union[str, int], comment: str, channel_id: str, + channel_name: str, signature: str, signing_ts: str) -> dict: + edited = False + if self.credentials_are_valid(channel_id, channel_name, signature, signing_ts) \ + and self.is_valid_body(comment): + comment_id = self.get_comment_id(comment_id) + if self.comments[comment_id]['channel_id'] == channel_id: + self.comments[comment_id].update({ + 'comment': comment, + 'signature': signature, + 'signing_ts': signing_ts + }) + edited = True + + return self.comments[comment_id] if edited else None + + def hide_comment(self, comment_id: typing.Union[int, str], signing_ts: str, signature: str): + comment_id = self.get_comment_id(comment_id) + if self.is_signable(signature, signing_ts): + self.comments[comment_id]['is_hidden'] = True + return True + return False + + def hide_comments(self, pieces: list): + comments_hidden = [] + for p in pieces: + if self.hide_comment(**p): + comments_hidden.append(p['comment_id']) + return {'hidden': comments_hidden} + + def get_claim_comments(self, claim_id, page=1, page_size=50,**kwargs): + comments = list(filter(lambda c: c['claim_id'] == claim_id, self.comments)) + return { + 'page': page, + 'page_size': page_size, + 'total_pages': ceil(len(comments)/page_size), + 'total_items': len(comments), + 'items': [self.clean(c) for c in (comments[::-1])[(page - 1) * page_size: page * page_size]], + 'has_hidden_comments': bool(list(filter(lambda x: x['is_hidden'], comments))) + } + + def get_claim_hidden_comments(self, claim_id, hidden=True, page=1, page_size=50): + comments = list(filter(lambda c: c['claim_id'] == claim_id, self.comments)) + select_comments = list(filter(lambda c: c['is_hidden'] == hidden, comments)) + return { + 'page': page, + 'page_size': page_size, + 'total_pages': ceil(len(select_comments) / page_size), + 'total_items': len(select_comments), + 'items': [self.clean(c) for c in (select_comments[::-1])[(page - 1) * page_size: page * page_size]], + 'has_hidden_comments': bool(list(filter(lambda c: c['is_hidden'], comments))) + } + + def get_comment_channel_by_id(self, comment_id: int, **kwargs): + comment = self.comments[self.get_comment_id(comment_id)] + return { + 'channel_id': comment.get('channel_id'), + 'channel_name': comment.get('channel_name') + } + + def get_comments_by_id(self, comment_ids: list): + return [self.comments[self.get_comment_id(cid)] for cid in comment_ids] + + methods = { + 'get_claim_comments': get_claim_comments, + 'get_comments_by_id': get_comments_by_id, + 'create_comment': create_comment, + 'abandon_comment': abandon_comment, + 'get_channel_from_comment_id': get_comment_channel_by_id, + 'get_claim_hidden_comments': get_claim_hidden_comments, + 'hide_comments': hide_comments, + 'edit_comment': edit_comment, + } + + def process_json(self, body) -> dict: + response = {'jsonrpc': '2.0', 'id': body['id']} + error = None + try: + if body['method'] in self.methods: + params: dict = body.get('params', {}) + result = self.methods[body['method']](self, **params) + response['result'] = result + else: + response['error'] = self.ERRORS['INVALID_METHOD'] + + except (ValueError, TypeError) as err: + error = err + response['error'] = self.ERRORS['INVALID_PARAMS'] + + except Exception as err: + error = err + response['error'] = self.ERRORS['UNKNOWN'] + + finally: + if 'error' in response: + response['error'].update({'exception': f'{type(error).__name__}: {error}'}) + + return response + + async def start(self): + self.runner = web.AppRunner(self.app) + await self.runner.setup() + self.server = web.TCPSite(self.runner, 'localhost', self.port) + await self.server.start() + + async def stop(self): + await self.runner.shutdown() + await self.runner.cleanup() + + async def api(self, request): + body = await request.json() + if type(body) is list or type(body) is dict: + if type(body) is list: + response = [self.process_json(part) for part in body] + else: + response = self.process_json(body) + return web.json_response(response) + else: + raise TypeError('invalid type passed') + + @staticmethod + def clean(d: dict): + return {k: v for k, v in d.items() if v or isinstance(v, bool)} + @staticmethod def is_valid_body(comment) -> bool: return 0 < len(comment) <= 2000 @@ -91,6 +212,11 @@ class MockedCommentServer: return 0 <= comment_id < len(self.comments) return False + def get_comment_id(self, cid: typing.Union[int, str, any]) -> int: + if not self.is_valid_comment_id(cid): + raise ValueError('Comment ID is Invalid') + return cid if isinstance(cid, int) else int(cid) + @staticmethod def claim_id_is_valid(claim_id: str) -> bool: return re.fullmatch('([a-z0-9]{40}|[A-Z0-9]{40})', claim_id) is not None @@ -131,108 +257,6 @@ class MockedCommentServer: claim_id is not None and self.claim_id_is_valid(claim_id) and \ (parent_id is None or self.is_valid_comment_id(parent_id)) - def edit_comment(self, comment_id: str, comment: str, channel_id: str, - channel_name: str, signature: str, signing_ts: str) -> dict: - pass - - - def hide_comment(self, comment_id: typing.Union[int, str], signing_ts: str, signature: str): - comment_id = int(comment_id) if not isinstance(comment_id, int) else comment_id - if self.is_valid_comment_id(comment_id) and self.is_signable(signature, signing_ts): - self.comments[comment_id]['is_hidden'] = True - return True - return False - - def hide_comments(self, pieces: list): - comments_hidden = [] - for p in pieces: - if self.hide_comment(**p): - comments_hidden.append(p['comment_id']) - return {'hidden': comments_hidden} - - def get_claim_comments(self, claim_id, page=1, page_size=50,**kwargs): - comments = list(filter(lambda c: c['claim_id'] == claim_id, self.comments)) - return { - 'page': page, - 'page_size': page_size, - 'total_pages': ceil(len(comments)/page_size), - 'total_items': len(comments), - 'items': [self.clean(c) for c in (comments[::-1])[(page - 1) * page_size: page * page_size]], - 'has_hidden_comments': bool(list(filter(lambda x: x['is_hidden'], comments))) - } - - def get_claim_hidden_comments(self, claim_id, hidden=True, page=1, page_size=50): - comments = list(filter(lambda c: c['claim_id'] == claim_id, self.comments)) - select_comments = list(filter(lambda c: c['is_hidden'] == hidden, comments)) - return { - 'page': page, - 'page_size': page_size, - 'total_pages': ceil(len(select_comments) / page_size), - 'total_items': len(select_comments), - 'items': [self.clean(c) for c in (select_comments[::-1])[(page - 1) * page_size: page * page_size]], - 'has_hidden_comments': bool(list(filter(lambda c: c['is_hidden'], comments))) - } - - def get_comment_channel_by_id(self, comment_id: int, **kwargs): - comment = self.comments[comment_id] - return { - 'channel_id': comment.get('channel_id'), - 'channel_name': comment.get('channel_name') - } - - def get_comments_by_id(self, comment_ids: list): - comment_ids = [int(c) if not isinstance(c, int) else c for c in comment_ids] - comments = [self.comments[cmnt_id] for cmnt_id in comment_ids if 0 <= cmnt_id < len(self.comments)] - return comments - - methods = { - 'get_claim_comments': get_claim_comments, - 'get_comments_by_id': get_comments_by_id, - 'create_comment': create_comment, - 'abandon_comment': abandon_comment, - 'get_channel_from_comment_id': get_comment_channel_by_id, - 'get_claim_hidden_comments': get_claim_hidden_comments, - 'hide_comments': hide_comments, - } - - def process_json(self, body) -> dict: - response = {'jsonrpc': '2.0', 'id': body['id']} - try: - if body['method'] in self.methods: - params: dict = body.get('params', {}) - comment_id = params.get('comment_id') - if comment_id and not isinstance(comment_id, int): - params['comment_id'] = int(comment_id) - - result = self.methods[body['method']](self, **params) - response['result'] = result - else: - response['error'] = self.ERRORS['INVALID_METHOD'] - except Exception as err: - response['error'] = self.ERRORS['UNKNOWN'] - response['error'].update({'exception': f'{type(err).__name__}: {err}'}) - return response - - async def start(self): - self.runner = web.AppRunner(self.app) - await self.runner.setup() - self.server = web.TCPSite(self.runner, 'localhost', self.port) - await self.server.start() - - async def stop(self): - await self.runner.shutdown() - await self.runner.cleanup() - - async def api(self, request): - body = await request.json() - if type(body) is list or type(body) is dict: - if type(body) is list: - response = [self.process_json(part) for part in body] - else: - response = self.process_json(body) - return web.json_response(response) - else: - raise TypeError('invalid type passed') class CommentCommands(CommandTestCase):