diff --git a/lbry/lbry/extras/daemon/exchange_rate_manager.py b/lbry/lbry/extras/daemon/exchange_rate_manager.py index 62dc4cab8..584b44a5a 100644 --- a/lbry/lbry/extras/daemon/exchange_rate_manager.py +++ b/lbry/lbry/extras/daemon/exchange_rate_manager.py @@ -4,6 +4,7 @@ import logging import json from decimal import Decimal from typing import Optional +from json.decoder import JSONDecodeError from aiohttp.client_exceptions import ClientError from lbry.error import InvalidExchangeRateResponseError, CurrencyConversionError from lbry.utils import aiohttp_request @@ -34,17 +35,59 @@ class ExchangeRate: return {'spot': self.spot, 'ts': self.ts} -class MarketFeed: - REQUESTS_TIMEOUT = 20 - EXCHANGE_RATE_UPDATE_RATE_SEC = 300 +class Deserializer: + def __init__(self, content_type): + self.content_type = content_type + self._deserializer = self._get_deserializer(content_type) - def __init__(self, market: str, name: str, url: str, params, fee): - self.market = market - self.name = name + def deserialize(self, body): + return self._deserializer(body) + + def _get_deserializer(self, content_type): + if content_type == 'json': + return self._deserialize_json + else: + raise ValueError('Content type {content_type} is not supported') + + def _deserialize_json(self, body): + try: + return json.loads(body) + except (ValueError, JSONDecodeError): + log.error('Failed to deserialize response body: %s', body) + raise + + +class AioHttpManager: + REQUESTS_TIMEOUT = 20 + + def __init__(self, url, params, content_type): self.url = url self.params = params + self.content_type = content_type + + async def _make_request(self): + async with aiohttp_request('get', self.url, params=self.params) as response: + return await response + + async def get_response_body(self): + response = await asyncio.wait_for(self._make_request(), self.REQUESTS_TIMEOUT) + if self.content_type not in response.headers.get('Content-Type'): + raise InvalidExchangeRateResponse(self.url, f'Received response is not of type {self.content_type}') + return response.read().decode() + + +class MarketFeed: + EXCHANGE_RATE_UPDATE_RATE_SEC = 300 + + def __init__(self, market: str, name: str, url: str, params: dict, fee: float, + content_type: str = 'json', network_manager=AioHttpManager, + deserializer=Deserializer): + self.market = market + self.name = name self.fee = fee self.rate = None + self._network_manager = network_manager(url, params, content_type) + self._deserializer = deserializer(content_type) self._task: Optional[asyncio.Task] = None self._online = True @@ -54,11 +97,13 @@ class MarketFeed: def is_online(self): return self._online - async def _make_request(self) -> str: - async with aiohttp_request('get', self.url, params=self.params) as response: - return (await response.read()).decode() + def _on_error(self, err): + log.warning("There was a problem updating %s exchange rate information from %s", + self.market, self.name) + log.debug("Exchange rate error (%s from %s): %s", self.market, self.name, err) + self._online = False - def _handle_response(self, response): + def _handle_response(self, body): raise NotImplementedError() def _subtract_fee(self, from_amount): @@ -70,24 +115,24 @@ class MarketFeed: self.rate = ExchangeRate(self.market, price, int(time.time())) self._online = True - def _on_error(self, err): - log.warning("There was a problem updating %s exchange rate information from %s", - self.market, self.name) - log.debug("Exchange rate error (%s from %s): %s", self.market, self.name, err) - self._online = False + async def _get_current_price(self): + body = self._deserializer.deserialize(await self._network_manager.get_response_body()) + return self._subtract_fee(self._handle_response(body)) async def _update_price(self): + try: + self._save_price(await self._get_current_price()) + except (asyncio.TimeoutError, InvalidExchangeRateResponseError, ClientError) as err: + self._on_error(err) + + async def _keep_updated(self): while True: - try: - response = await asyncio.wait_for(self._make_request(), self.REQUESTS_TIMEOUT) - self._save_price(self._subtract_fee(self._handle_response(response))) - except (asyncio.TimeoutError, InvalidExchangeRateResponseError, ClientError) as err: - self._on_error(err) + self._update_price() await asyncio.sleep(self.EXCHANGE_RATE_UPDATE_RATE_SEC) def start(self): if not self._task: - self._task = asyncio.create_task(self._update_price()) + self._task = asyncio.create_task(self._keep_updated) def stop(self): if self._task and not self._task.done(): @@ -105,8 +150,7 @@ class BittrexFeed(MarketFeed): BITTREX_FEE ) - def _handle_response(self, response): - json_response = json.loads(response) + def _handle_response(self, json_response): if 'result' not in json_response: raise InvalidExchangeRateResponseError(self.name, 'result not found') trades = json_response['result'] @@ -130,8 +174,7 @@ class LBRYioFeed(MarketFeed): 0.0, ) - def _handle_response(self, response): - json_response = json.loads(response) + def _handle_response(self, json_response): if 'data' not in json_response: raise InvalidExchangeRateResponseError(self.name, 'result not found') return 1.0 / json_response['data']['lbc_btc'] @@ -147,11 +190,7 @@ class LBRYioBTCFeed(MarketFeed): 0.0, ) - def _handle_response(self, response): - try: - json_response = json.loads(response) - except ValueError: - raise InvalidExchangeRateResponseError(self.name, "invalid rate response : %s" % response) + def _handle_response(self, json_response): if 'data' not in json_response: raise InvalidExchangeRateResponseError(self.name, 'result not found') return 1.0 / json_response['data']['btc_usd'] @@ -167,11 +206,7 @@ class CryptonatorBTCFeed(MarketFeed): 0.0, ) - def _handle_response(self, response): - try: - json_response = json.loads(response) - except ValueError: - raise InvalidExchangeRateResponseError(self.name, "invalid rate response") + def _handle_response(self, json_response): if 'ticker' not in json_response or len(json_response['ticker']) == 0 or \ 'success' not in json_response or json_response['success'] is not True: raise InvalidExchangeRateResponseError(self.name, 'result not found') @@ -188,11 +223,7 @@ class CryptonatorFeed(MarketFeed): 0.0, ) - def _handle_response(self, response): - try: - json_response = json.loads(response) - except ValueError: - raise InvalidExchangeRateResponseError(self.name, "invalid rate response") + def _handle_response(self, json_response): if 'ticker' not in json_response or len(json_response['ticker']) == 0 or \ 'success' not in json_response or json_response['success'] is not True: raise InvalidExchangeRateResponseError(self.name, 'result not found') diff --git a/lbry/tests/unit/lbrynet_daemon/test_ExchangeRateManager.py b/lbry/tests/unit/lbrynet_daemon/test_ExchangeRateManager.py index 23e9c351a..0860fe0b7 100644 --- a/lbry/tests/unit/lbrynet_daemon/test_ExchangeRateManager.py +++ b/lbry/tests/unit/lbrynet_daemon/test_ExchangeRateManager.py @@ -1,7 +1,9 @@ import unittest +import logging from decimal import Decimal from lbry.schema.claim import Claim from lbry.extras.daemon import exchange_rate_manager +from torba.testcase import AsyncioTestCase from lbry.error import InvalidExchangeRateResponseError from tests import test_utils @@ -93,18 +95,25 @@ class LBRYioFeedTest(unittest.TestCase): def test_handle_response(self): feed = exchange_rate_manager.LBRYioFeed() - response = '{\"data\": {\"fresh\": 0, \"lbc_usd\": 0.05863062523378918, ' \ - '\"lbc_btc\": 5.065289549855739e-05, \"btc_usd\": 1157.498}, ' \ - '\"success\": true, \"error\": null}' + response = { + 'data': { + 'fresh': 0, 'lbc_usd': 0.05863062523378918, 'lbc_btc': 5.065289549855739e-05, 'btc_usd': 1157.498 + }, + 'success': True, + 'error': None + } out = feed._handle_response(response) expected = 1.0 / 5.065289549855739e-05 self.assertEqual(expected, out) - response = '{}' + response = {} with self.assertRaises(InvalidExchangeRateResponseError): feed._handle_response(response) - response = '{"success":true,"result":[]}' + response = { + "success": True, + "result": [] + } with self.assertRaises(InvalidExchangeRateResponseError): feed._handle_response(response) @@ -113,45 +122,66 @@ class TestExchangeRateFeeds(unittest.TestCase): def test_handle_lbryio_btc_response(self): feed = exchange_rate_manager.LBRYioBTCFeed() - response = '{\"data\": {\"fresh\": 0, \"lbc_usd\": 0.05863062523378918, ' \ - '\"lbc_btc\": 5.065289549855739e-05, \"btc_usd\": 1157.498}, ' \ - '\"success\": true, \"error\": null}' + response = { + 'data': { + 'fresh': 0, 'lbc_usd': 0.05863062523378918, 'lbc_btc': 5.065289549855739e-05, 'btc_usd': 1157.498 + }, + 'success': True, + 'error': None + } out = feed._handle_response(response) expected = 1.0 / 1157.498 self.assertEqual(expected, out) - response = '{}' + response = {} with self.assertRaises(InvalidExchangeRateResponseError): out = feed._handle_response(response) - response = '{"success":true,"result":[]}' + response = { + "success": True, + "result": {} + } with self.assertRaises(InvalidExchangeRateResponseError): out = feed._handle_response(response) def test_handle_cryptonator_lbc_response(self): feed = exchange_rate_manager.CryptonatorFeed() - response = '{\"ticker\":{\"base\":\"BTC\",\"target\":\"LBC\",\"price\":\"23657.44026496\"' \ - ',\"volume\":\"\",\"change\":\"-5.59806916\"},\"timestamp\":1507470422' \ - ',\"success\":true,\"error\":\"\"}' + response = { + 'ticker': { + 'base': 'BTC', 'target': 'LBC', 'price': 23657.44026496, 'volume': '', 'change': -5.59806916, + }, + 'timestamp': 1507470422, + 'success': True, + 'error': "" + } out = feed._handle_response(response) expected = 23657.44026496 self.assertEqual(expected, out) - response = '{}' + response = {} with self.assertRaises(InvalidExchangeRateResponseError): feed._handle_response(response) - response = '{"success":true,"ticker":{}}' + response = { + "success": True, + "ticker": {} + } with self.assertRaises(InvalidExchangeRateResponseError): feed._handle_response(response) def test_handle_cryptonator_btc_response(self): feed = exchange_rate_manager.CryptonatorBTCFeed() - response = '{\"ticker\":{\"base\":\"USD\",\"target\":\"BTC\",\"price\":\"0.00022123\",' \ - '\"volume\":\"\",\"change\":\"-0.00000259\"},\"timestamp\":1507471141,' \ - '\"success\":true,\"error\":\"\"}' + response = { + 'ticker': { + 'base': 'BTC', 'target': 'LBC', 'price': 0.00022123, 'volume': '', 'change': -0.00000259, + }, + 'timestamp': 1507471141, + 'success': True, + 'error': '' + } + out = feed._handle_response(response) expected = 0.00022123 self.assertEqual(expected, out) @@ -160,26 +190,125 @@ class TestExchangeRateFeeds(unittest.TestCase): with self.assertRaises(InvalidExchangeRateResponseError): feed._handle_response(response) - response = '{"success":true,"ticker":{}}' + response = { + "success": True, + "ticker": {} + } with self.assertRaises(InvalidExchangeRateResponseError): feed._handle_response(response) def test_handle_bittrex_response(self): feed = exchange_rate_manager.BittrexFeed() - response = '{"success":true,"message":"","result":[{"Id":6902471,"TimeStamp":"2017-02-2'\ - '7T23:41:52.213","Quantity":56.12611239,"Price":0.00001621,"Total":0.00090980,"FillType":"'\ - 'PARTIAL_FILL","OrderType":"SELL"},{"Id":6902403,"TimeStamp":"2017-02-27T23:31:40.463","Qu'\ - 'antity":430.99988180,"Price":0.00001592,"Total":0.00686151,"FillType":"PARTIAL_FILL","Ord'\ - 'erType":"SELL"}]}' + response = { + "success": True, + "message": "", + "result": [ + { + 'Id': 6902471, 'TimeStamp': '2017-02-27T23:41:52.213', 'Quantity': 56.12611239, + "Price": 0.00001621, "Total": 0.00090980, "FillType": "PARTIAL_FILL", "OrderType": "SELL" + }, + { + "Id": 6902403, "TimeStamp": "2017-02-27t23:31:40.463", "Quantity": 430.99988180, + "Price": 0.00001592, "Total": 0.00686151, "FillType": "PARTIAL_FILL", "OrderType": "SELL" + } + ] + } out = feed._handle_response(response) expected = 1.0 / ((0.00090980+0.00686151) / (56.12611239+430.99988180)) self.assertEqual(expected, out) - response = '{}' + response = {} with self.assertRaises(InvalidExchangeRateResponseError): feed._handle_response(response) - response = '{"success":true,"result":[]}' + response = { + "success": True, + "result": [] + } with self.assertRaises(InvalidExchangeRateResponseError): feed._handle_response(response) + + +class TestMarketFeed(AsyncioTestCase): + def setUp(self): + self.feed = exchange_rate_manager.MarketFeed('some market', 'some name', 'some url', {'param': 1}, 0.005) + + def test_save_price(self): + with self.assertLogs('lbry.extras.daemon.exchange_rate_manager', logging.DEBUG) as cm: + self.feed._save_price(1) + self.assertIn(self.feed.market, ' '.join(cm.output)) + self.assertIn(self.feed.name, ' '.join(cm.output)) + self.assertTrue(self.feed.is_online()) + self.assertIsNotNone(self.feed.rate) + + with self.assertRaises(ValueError): + self.feed._save_price(0) + + with self.assertRaises(TypeError): + self.feed._save_price('not a price') + + async def test_update_price(self): + def mock_handle_response(json_obj): + return json_obj['data']['lbc_btc'] + + async def get_response_body_mock(self): + return '{\"data\": {\"fresh\": 0, \"lbc_usd\": 0.05863062523378918, ' \ + '\"lbc_btc\": 5.065289549855739e-05, \"btc_usd\": 1157.498}, ' \ + '\"success\": true, \"error\": null}' + + self.feed._handle_response = mock_handle_response + + with unittest.mock.patch.object( + exchange_rate_manager.AioHttpManager, 'get_response_body', get_response_body_mock + ): + await self.feed._update_price() + self.assertEqual(self.feed.rate.spot, 5.090743266186672e-05) + self.assertTrue(self.feed.is_online()) + + async def get_response_body_mock(self): + return '