Refactor exchange rate manager

This commit is contained in:
Miroslav Kovar 2019-10-14 11:17:18 +02:00 committed by Lex Berezhny
parent 1e6542d12d
commit 73613d1583
2 changed files with 226 additions and 66 deletions

View file

@ -4,6 +4,7 @@ import logging
import json
from decimal import Decimal
from typing import Optional
from json.decoder import JSONDecodeError
from aiohttp.client_exceptions import ClientError
from lbry.error import InvalidExchangeRateResponseError, CurrencyConversionError
from lbry.utils import aiohttp_request
@ -34,17 +35,59 @@ class ExchangeRate:
return {'spot': self.spot, 'ts': self.ts}
class MarketFeed:
REQUESTS_TIMEOUT = 20
EXCHANGE_RATE_UPDATE_RATE_SEC = 300
class Deserializer:
def __init__(self, content_type):
self.content_type = content_type
self._deserializer = self._get_deserializer(content_type)
def __init__(self, market: str, name: str, url: str, params, fee):
self.market = market
self.name = name
def deserialize(self, body):
return self._deserializer(body)
def _get_deserializer(self, content_type):
if content_type == 'json':
return self._deserialize_json
else:
raise ValueError('Content type {content_type} is not supported')
def _deserialize_json(self, body):
try:
return json.loads(body)
except (ValueError, JSONDecodeError):
log.error('Failed to deserialize response body: %s', body)
raise
class AioHttpManager:
REQUESTS_TIMEOUT = 20
def __init__(self, url, params, content_type):
self.url = url
self.params = params
self.content_type = content_type
async def _make_request(self):
async with aiohttp_request('get', self.url, params=self.params) as response:
return await response
async def get_response_body(self):
response = await asyncio.wait_for(self._make_request(), self.REQUESTS_TIMEOUT)
if self.content_type not in response.headers.get('Content-Type'):
raise InvalidExchangeRateResponse(self.url, f'Received response is not of type {self.content_type}')
return response.read().decode()
class MarketFeed:
EXCHANGE_RATE_UPDATE_RATE_SEC = 300
def __init__(self, market: str, name: str, url: str, params: dict, fee: float,
content_type: str = 'json', network_manager=AioHttpManager,
deserializer=Deserializer):
self.market = market
self.name = name
self.fee = fee
self.rate = None
self._network_manager = network_manager(url, params, content_type)
self._deserializer = deserializer(content_type)
self._task: Optional[asyncio.Task] = None
self._online = True
@ -54,11 +97,13 @@ class MarketFeed:
def is_online(self):
return self._online
async def _make_request(self) -> str:
async with aiohttp_request('get', self.url, params=self.params) as response:
return (await response.read()).decode()
def _on_error(self, err):
log.warning("There was a problem updating %s exchange rate information from %s",
self.market, self.name)
log.debug("Exchange rate error (%s from %s): %s", self.market, self.name, err)
self._online = False
def _handle_response(self, response):
def _handle_response(self, body):
raise NotImplementedError()
def _subtract_fee(self, from_amount):
@ -70,24 +115,24 @@ class MarketFeed:
self.rate = ExchangeRate(self.market, price, int(time.time()))
self._online = True
def _on_error(self, err):
log.warning("There was a problem updating %s exchange rate information from %s",
self.market, self.name)
log.debug("Exchange rate error (%s from %s): %s", self.market, self.name, err)
self._online = False
async def _get_current_price(self):
body = self._deserializer.deserialize(await self._network_manager.get_response_body())
return self._subtract_fee(self._handle_response(body))
async def _update_price(self):
try:
self._save_price(await self._get_current_price())
except (asyncio.TimeoutError, InvalidExchangeRateResponseError, ClientError) as err:
self._on_error(err)
async def _keep_updated(self):
while True:
try:
response = await asyncio.wait_for(self._make_request(), self.REQUESTS_TIMEOUT)
self._save_price(self._subtract_fee(self._handle_response(response)))
except (asyncio.TimeoutError, InvalidExchangeRateResponseError, ClientError) as err:
self._on_error(err)
self._update_price()
await asyncio.sleep(self.EXCHANGE_RATE_UPDATE_RATE_SEC)
def start(self):
if not self._task:
self._task = asyncio.create_task(self._update_price())
self._task = asyncio.create_task(self._keep_updated)
def stop(self):
if self._task and not self._task.done():
@ -105,8 +150,7 @@ class BittrexFeed(MarketFeed):
BITTREX_FEE
)
def _handle_response(self, response):
json_response = json.loads(response)
def _handle_response(self, json_response):
if 'result' not in json_response:
raise InvalidExchangeRateResponseError(self.name, 'result not found')
trades = json_response['result']
@ -130,8 +174,7 @@ class LBRYioFeed(MarketFeed):
0.0,
)
def _handle_response(self, response):
json_response = json.loads(response)
def _handle_response(self, json_response):
if 'data' not in json_response:
raise InvalidExchangeRateResponseError(self.name, 'result not found')
return 1.0 / json_response['data']['lbc_btc']
@ -147,11 +190,7 @@ class LBRYioBTCFeed(MarketFeed):
0.0,
)
def _handle_response(self, response):
try:
json_response = json.loads(response)
except ValueError:
raise InvalidExchangeRateResponseError(self.name, "invalid rate response : %s" % response)
def _handle_response(self, json_response):
if 'data' not in json_response:
raise InvalidExchangeRateResponseError(self.name, 'result not found')
return 1.0 / json_response['data']['btc_usd']
@ -167,11 +206,7 @@ class CryptonatorBTCFeed(MarketFeed):
0.0,
)
def _handle_response(self, response):
try:
json_response = json.loads(response)
except ValueError:
raise InvalidExchangeRateResponseError(self.name, "invalid rate response")
def _handle_response(self, json_response):
if 'ticker' not in json_response or len(json_response['ticker']) == 0 or \
'success' not in json_response or json_response['success'] is not True:
raise InvalidExchangeRateResponseError(self.name, 'result not found')
@ -188,11 +223,7 @@ class CryptonatorFeed(MarketFeed):
0.0,
)
def _handle_response(self, response):
try:
json_response = json.loads(response)
except ValueError:
raise InvalidExchangeRateResponseError(self.name, "invalid rate response")
def _handle_response(self, json_response):
if 'ticker' not in json_response or len(json_response['ticker']) == 0 or \
'success' not in json_response or json_response['success'] is not True:
raise InvalidExchangeRateResponseError(self.name, 'result not found')

View file

@ -1,7 +1,9 @@
import unittest
import logging
from decimal import Decimal
from lbry.schema.claim import Claim
from lbry.extras.daemon import exchange_rate_manager
from torba.testcase import AsyncioTestCase
from lbry.error import InvalidExchangeRateResponseError
from tests import test_utils
@ -93,18 +95,25 @@ class LBRYioFeedTest(unittest.TestCase):
def test_handle_response(self):
feed = exchange_rate_manager.LBRYioFeed()
response = '{\"data\": {\"fresh\": 0, \"lbc_usd\": 0.05863062523378918, ' \
'\"lbc_btc\": 5.065289549855739e-05, \"btc_usd\": 1157.498}, ' \
'\"success\": true, \"error\": null}'
response = {
'data': {
'fresh': 0, 'lbc_usd': 0.05863062523378918, 'lbc_btc': 5.065289549855739e-05, 'btc_usd': 1157.498
},
'success': True,
'error': None
}
out = feed._handle_response(response)
expected = 1.0 / 5.065289549855739e-05
self.assertEqual(expected, out)
response = '{}'
response = {}
with self.assertRaises(InvalidExchangeRateResponseError):
feed._handle_response(response)
response = '{"success":true,"result":[]}'
response = {
"success": True,
"result": []
}
with self.assertRaises(InvalidExchangeRateResponseError):
feed._handle_response(response)
@ -113,45 +122,66 @@ class TestExchangeRateFeeds(unittest.TestCase):
def test_handle_lbryio_btc_response(self):
feed = exchange_rate_manager.LBRYioBTCFeed()
response = '{\"data\": {\"fresh\": 0, \"lbc_usd\": 0.05863062523378918, ' \
'\"lbc_btc\": 5.065289549855739e-05, \"btc_usd\": 1157.498}, ' \
'\"success\": true, \"error\": null}'
response = {
'data': {
'fresh': 0, 'lbc_usd': 0.05863062523378918, 'lbc_btc': 5.065289549855739e-05, 'btc_usd': 1157.498
},
'success': True,
'error': None
}
out = feed._handle_response(response)
expected = 1.0 / 1157.498
self.assertEqual(expected, out)
response = '{}'
response = {}
with self.assertRaises(InvalidExchangeRateResponseError):
out = feed._handle_response(response)
response = '{"success":true,"result":[]}'
response = {
"success": True,
"result": {}
}
with self.assertRaises(InvalidExchangeRateResponseError):
out = feed._handle_response(response)
def test_handle_cryptonator_lbc_response(self):
feed = exchange_rate_manager.CryptonatorFeed()
response = '{\"ticker\":{\"base\":\"BTC\",\"target\":\"LBC\",\"price\":\"23657.44026496\"' \
',\"volume\":\"\",\"change\":\"-5.59806916\"},\"timestamp\":1507470422' \
',\"success\":true,\"error\":\"\"}'
response = {
'ticker': {
'base': 'BTC', 'target': 'LBC', 'price': 23657.44026496, 'volume': '', 'change': -5.59806916,
},
'timestamp': 1507470422,
'success': True,
'error': ""
}
out = feed._handle_response(response)
expected = 23657.44026496
self.assertEqual(expected, out)
response = '{}'
response = {}
with self.assertRaises(InvalidExchangeRateResponseError):
feed._handle_response(response)
response = '{"success":true,"ticker":{}}'
response = {
"success": True,
"ticker": {}
}
with self.assertRaises(InvalidExchangeRateResponseError):
feed._handle_response(response)
def test_handle_cryptonator_btc_response(self):
feed = exchange_rate_manager.CryptonatorBTCFeed()
response = '{\"ticker\":{\"base\":\"USD\",\"target\":\"BTC\",\"price\":\"0.00022123\",' \
'\"volume\":\"\",\"change\":\"-0.00000259\"},\"timestamp\":1507471141,' \
'\"success\":true,\"error\":\"\"}'
response = {
'ticker': {
'base': 'BTC', 'target': 'LBC', 'price': 0.00022123, 'volume': '', 'change': -0.00000259,
},
'timestamp': 1507471141,
'success': True,
'error': ''
}
out = feed._handle_response(response)
expected = 0.00022123
self.assertEqual(expected, out)
@ -160,26 +190,125 @@ class TestExchangeRateFeeds(unittest.TestCase):
with self.assertRaises(InvalidExchangeRateResponseError):
feed._handle_response(response)
response = '{"success":true,"ticker":{}}'
response = {
"success": True,
"ticker": {}
}
with self.assertRaises(InvalidExchangeRateResponseError):
feed._handle_response(response)
def test_handle_bittrex_response(self):
feed = exchange_rate_manager.BittrexFeed()
response = '{"success":true,"message":"","result":[{"Id":6902471,"TimeStamp":"2017-02-2'\
'7T23:41:52.213","Quantity":56.12611239,"Price":0.00001621,"Total":0.00090980,"FillType":"'\
'PARTIAL_FILL","OrderType":"SELL"},{"Id":6902403,"TimeStamp":"2017-02-27T23:31:40.463","Qu'\
'antity":430.99988180,"Price":0.00001592,"Total":0.00686151,"FillType":"PARTIAL_FILL","Ord'\
'erType":"SELL"}]}'
response = {
"success": True,
"message": "",
"result": [
{
'Id': 6902471, 'TimeStamp': '2017-02-27T23:41:52.213', 'Quantity': 56.12611239,
"Price": 0.00001621, "Total": 0.00090980, "FillType": "PARTIAL_FILL", "OrderType": "SELL"
},
{
"Id": 6902403, "TimeStamp": "2017-02-27t23:31:40.463", "Quantity": 430.99988180,
"Price": 0.00001592, "Total": 0.00686151, "FillType": "PARTIAL_FILL", "OrderType": "SELL"
}
]
}
out = feed._handle_response(response)
expected = 1.0 / ((0.00090980+0.00686151) / (56.12611239+430.99988180))
self.assertEqual(expected, out)
response = '{}'
response = {}
with self.assertRaises(InvalidExchangeRateResponseError):
feed._handle_response(response)
response = '{"success":true,"result":[]}'
response = {
"success": True,
"result": []
}
with self.assertRaises(InvalidExchangeRateResponseError):
feed._handle_response(response)
class TestMarketFeed(AsyncioTestCase):
def setUp(self):
self.feed = exchange_rate_manager.MarketFeed('some market', 'some name', 'some url', {'param': 1}, 0.005)
def test_save_price(self):
with self.assertLogs('lbry.extras.daemon.exchange_rate_manager', logging.DEBUG) as cm:
self.feed._save_price(1)
self.assertIn(self.feed.market, ' '.join(cm.output))
self.assertIn(self.feed.name, ' '.join(cm.output))
self.assertTrue(self.feed.is_online())
self.assertIsNotNone(self.feed.rate)
with self.assertRaises(ValueError):
self.feed._save_price(0)
with self.assertRaises(TypeError):
self.feed._save_price('not a price')
async def test_update_price(self):
def mock_handle_response(json_obj):
return json_obj['data']['lbc_btc']
async def get_response_body_mock(self):
return '{\"data\": {\"fresh\": 0, \"lbc_usd\": 0.05863062523378918, ' \
'\"lbc_btc\": 5.065289549855739e-05, \"btc_usd\": 1157.498}, ' \
'\"success\": true, \"error\": null}'
self.feed._handle_response = mock_handle_response
with unittest.mock.patch.object(
exchange_rate_manager.AioHttpManager, 'get_response_body', get_response_body_mock
):
await self.feed._update_price()
self.assertEqual(self.feed.rate.spot, 5.090743266186672e-05)
self.assertTrue(self.feed.is_online())
async def get_response_body_mock(self):
return '<h1>not a json</h1>'
with unittest.mock.patch.object(
exchange_rate_manager.AioHttpManager, 'get_response_body', get_response_body_mock
), self.assertRaises(ValueError):
await self.feed._update_price()
class TestDeserializer(unittest.TestCase):
def test_valid_json(self):
deserializer = exchange_rate_manager.Deserializer('json')
body = '{"data": "valid json", "some_float": 3.1415, "and_a_dict": {"value": true}}'
json_obj = deserializer.deserialize(body)
self.assertEqual(json_obj['data'], 'valid json')
self.assertEqual(json_obj['some_float'], 3.1415)
self.assertTrue(json_obj['and_a_dict']['value'])
def test_invalid_json(self):
def assert_raises_error(body):
with self.assertRaises(ValueError):
deserializer.deserialize(body)
deserializer = exchange_rate_manager.Deserializer('json')
assert_raises_error('<h1>not a json</h1>')
assert_raises_error('')
assert_raises_error('{')
def test_invalid_content_type(self):
with self.assertRaises(ValueError):
exchange_rate_manager.Deserializer('not a format')
class TestAioHttpManager(AsyncioTestCase):
async def test_get_response_body(self):
async def make_request_mock(self):
response = unittest.mock.Mock(
headers={'Content-Type': 'jibberish'}
)
return response
manager = exchange_rate_manager.AioHttpManager('some url', 'some params', 'json')
with unittest.mock.patch.object(
exchange_rate_manager.AioHttpManager, '_make_request', make_request_mock
), self.assertRaises(InvalidExchangeRateResponse):
await manager.get_response_body()