further refactor and simplification

This commit is contained in:
Lex Berezhny 2019-12-15 01:02:18 -05:00
parent 25b6c1b6ca
commit c044e1ea8c
4 changed files with 207 additions and 393 deletions

View file

@ -1,10 +1,9 @@
import asyncio
import time
import logging
import json
import time
import asyncio
import logging
from decimal import Decimal
from typing import Optional
from json.decoder import JSONDecodeError
from typing import Optional, Iterable, Type
from aiohttp.client_exceptions import ClientError
from lbry.error import InvalidExchangeRateResponseError, CurrencyConversionError
from lbry.utils import aiohttp_request
@ -12,10 +11,6 @@ from lbry.wallet.dewies import lbc_to_dewies
log = logging.getLogger(__name__)
CURRENCY_PAIRS = ["USDBTC", "BTCLBC"]
BITTREX_FEE = 0.0025
COINBASE_FEE = 0.0 # add fee
class ExchangeRate:
def __init__(self, market, spot, ts):
@ -28,217 +23,157 @@ class ExchangeRate:
self.ts = ts
def __repr__(self):
out = f"Currency pair:{self.currency_pair}, spot:{self.spot}, ts:{self.ts}"
return out
return f"Currency pair:{self.currency_pair}, spot:{self.spot}, ts:{self.ts}"
def as_dict(self):
return {'spot': self.spot, 'ts': self.ts}
class Deserializer:
def __init__(self, content_type):
self.content_type = content_type
self._deserializer = self._get_deserializer(content_type)
def deserialize(self, body):
return self._deserializer(body)
def _get_deserializer(self, content_type):
if content_type == 'json':
return self._deserialize_json
else:
raise ValueError('Content type {content_type} is not supported')
def _deserialize_json(self, body):
try:
return json.loads(body)
except (ValueError, JSONDecodeError):
log.error('Failed to deserialize response body: %s', body)
raise
class AioHttpManager:
REQUESTS_TIMEOUT = 20
def __init__(self, url, params, content_type):
self.url = url
self.params = params
self.content_type = content_type
async def _make_request(self):
async with aiohttp_request('get', self.url, params=self.params) as response:
return await response
async def get_response_body(self):
response = await asyncio.wait_for(self._make_request(), self.REQUESTS_TIMEOUT)
if self.content_type not in response.headers.get('Content-Type'):
raise InvalidExchangeRateResponseError(self.url, f'Received response is not of type {self.content_type}')
return response.read().decode()
class MarketFeed:
EXCHANGE_RATE_UPDATE_RATE_SEC = 300
name: str
market: str
url: str
params = {}
fee = 0
def __init__(self, market: str, name: str, url: str, params: dict, fee: float,
content_type: str = 'json', network_manager=AioHttpManager,
deserializer=Deserializer):
self.market = market
self.name = name
self.fee = fee
self.rate = None
self._network_manager = network_manager(url, params, content_type)
self._deserializer = deserializer(content_type)
update_interval = 300
request_timeout = 50
def __init__(self):
self.rate: Optional[float] = None
self.last_check = 0
self._last_response = None
self._task: Optional[asyncio.Task] = None
self._online = True
self.event = asyncio.Event()
def rate_is_initialized(self):
@property
def has_rate(self):
return self.rate is not None
@property
def is_online(self):
return self._online
return self.last_check+self.update_interval+self.request_timeout > time.time()
def _on_error(self, err):
log.warning("There was a problem updating %s exchange rate information from %s",
self.market, self.name)
log.debug("Exchange rate error (%s from %s): %s", self.market, self.name, err)
self._online = False
def _handle_response(self, body):
def get_rate_from_response(self, response):
raise NotImplementedError()
def _subtract_fee(self, from_amount):
# increase amount to account for market fees
return from_amount / (1.0 - self.fee)
async def get_response(self):
async with aiohttp_request('get', self.url, params=self.params, timeout=self.request_timeout) as response:
self._last_response = await response.json()
return self._last_response
def _save_price(self, price):
log.debug("Saving price update %f for %s from %s" % (price, self.market, self.name))
self.rate = ExchangeRate(self.market, price, int(time.time()))
self._online = True
async def _get_current_price(self):
body = self._deserializer.deserialize(await self._network_manager.get_response_body())
return self._subtract_fee(self._handle_response(body))
async def _update_price(self):
async def get_rate(self):
try:
self._save_price(await self._get_current_price())
except (asyncio.TimeoutError, InvalidExchangeRateResponseError, ClientError) as err:
self._on_error(err)
data = await self.get_response()
rate = self.get_rate_from_response(data)
rate = rate / (1.0 - self.fee)
log.debug("Saving rate update %f for %s from %s", rate, self.market, self.name)
self.rate = ExchangeRate(self.market, rate, int(time.time()))
self.last_check = time.time()
self.event.set()
return self.rate
except asyncio.CancelledError:
raise
except asyncio.TimeoutError:
log.warning("Timed out fetching exchange rate from %s.", self.name)
except json.JSONDecodeError as e:
log.warning("Could not parse exchange rate response from %s: %s", self.name, e.doc)
except InvalidExchangeRateResponseError as e:
log.warning(str(e))
except Exception as e:
log.exception("Exchange rate error (%s from %s):", self.market, self.name)
async def _keep_updated(self):
async def keep_updated(self):
while True:
self._update_price()
await asyncio.sleep(self.EXCHANGE_RATE_UPDATE_RATE_SEC)
await self.get_rate()
await asyncio.sleep(self.update_interval)
def start(self):
if not self._task:
self._task = asyncio.create_task(self._keep_updated)
self._task = asyncio.create_task(self.keep_updated())
def stop(self):
if self._task and not self._task.done():
self._task.cancel()
self._task = None
self.event.clear()
class BittrexFeed(MarketFeed):
def __init__(self):
super().__init__(
"BTCLBC",
"Bittrex",
"https://bittrex.com/api/v1.1/public/getmarkethistory",
{'market': 'BTC-LBC', 'count': 50},
BITTREX_FEE
)
name = "Bittrex"
market = "BTCLBC"
url = "https://bittrex.com/api/v1.1/public/getmarkethistory"
params = {'market': 'BTC-LBC', 'count': 50}
fee = 0.0025
def _handle_response(self, json_response):
def get_rate_from_response(self, json_response):
if 'result' not in json_response:
raise InvalidExchangeRateResponseError(self.name, 'result not found')
trades = json_response['result']
if len(trades) == 0:
raise InvalidExchangeRateResponseError(self.market, 'trades not found')
raise InvalidExchangeRateResponseError(self.name, 'trades not found')
totals = sum([i['Total'] for i in trades])
qtys = sum([i['Quantity'] for i in trades])
if totals <= 0 or qtys <= 0:
raise InvalidExchangeRateResponseError(self.market, 'quantities were not positive')
raise InvalidExchangeRateResponseError(self.name, 'quantities were not positive')
vwap = totals / qtys
return float(1.0 / vwap)
class LBRYioFeed(MarketFeed):
def __init__(self):
super().__init__(
"BTCLBC",
"lbry.com",
"https://api.lbry.com/lbc/exchange_rate",
{},
0.0,
)
class LBRYFeed(MarketFeed):
name = "lbry.com"
market = "BTCLBC"
url = "https://api.lbry.com/lbc/exchange_rate"
def _handle_response(self, json_response):
def get_rate_from_response(self, json_response):
if 'data' not in json_response:
raise InvalidExchangeRateResponseError(self.name, 'result not found')
return 1.0 / json_response['data']['lbc_btc']
class LBRYioBTCFeed(MarketFeed):
def __init__(self):
super().__init__(
"USDBTC",
"lbry.com",
"https://api.lbry.com/lbc/exchange_rate",
{},
0.0,
)
class LBRYBTCFeed(LBRYFeed):
market = "USDBTC"
def _handle_response(self, json_response):
def get_rate_from_response(self, json_response):
if 'data' not in json_response:
raise InvalidExchangeRateResponseError(self.name, 'result not found')
return 1.0 / json_response['data']['btc_usd']
class CryptonatorBTCFeed(MarketFeed):
def __init__(self):
super().__init__(
"USDBTC",
"cryptonator.com",
"https://api.cryptonator.com/api/ticker/usd-btc",
{},
0.0,
)
def _handle_response(self, json_response):
if 'ticker' not in json_response or len(json_response['ticker']) == 0 or \
'success' not in json_response or json_response['success'] is not True:
raise InvalidExchangeRateResponseError(self.name, 'result not found')
return float(json_response['ticker']['price'])
class CryptonatorFeed(MarketFeed):
def __init__(self):
super().__init__(
"BTCLBC",
"cryptonator.com",
"https://api.cryptonator.com/api/ticker/btc-lbc",
{},
0.0,
)
name = "cryptonator.com"
market = "BTCLBC"
url = "https://api.cryptonator.com/api/ticker/btc-lbc"
def _handle_response(self, json_response):
def get_rate_from_response(self, json_response):
if 'ticker' not in json_response or len(json_response['ticker']) == 0 or \
'success' not in json_response or json_response['success'] is not True:
raise InvalidExchangeRateResponseError(self.name, 'result not found')
return float(json_response['ticker']['price'])
class CryptonatorBTCFeed(CryptonatorFeed):
market = "USDBTC"
url = "https://api.cryptonator.com/api/ticker/usd-btc"
FEEDS: Iterable[Type[MarketFeed]] = (
LBRYFeed,
LBRYBTCFeed,
BittrexFeed,
CryptonatorFeed,
CryptonatorBTCFeed,
)
class ExchangeRateManager:
def __init__(self):
self.market_feeds = [
LBRYioBTCFeed(),
LBRYioFeed(),
BittrexFeed(),
# CryptonatorBTCFeed(),
# CryptonatorFeed()
]
def __init__(self, feeds=FEEDS):
self.market_feeds = [Feed() for Feed in feeds]
def wait(self):
return asyncio.wait(
[feed.event.wait() for feed in self.market_feeds],
)
def start(self):
log.info("Starting exchange rate manager")
@ -257,11 +192,11 @@ class ExchangeRateManager:
return round(amount, 8)
for market in self.market_feeds:
if (market.rate_is_initialized() and market.is_online() and
if (market.has_rate and market.is_online and
market.rate.currency_pair == (from_currency, to_currency)):
return round(amount * Decimal(market.rate.spot), 8)
for market in self.market_feeds:
if (market.rate_is_initialized() and market.is_online() and
if (market.has_rate and market.is_online and
market.rate.currency_pair[0] == from_currency):
return round(self.convert_currency(
market.rate.currency_pair[1], to_currency, amount * Decimal(market.rate.spot)), 8)

View file

@ -0,0 +1,23 @@
from decimal import Decimal
from torba.testcase import AsyncioTestCase
from lbry.extras.daemon.exchange_rate_manager import (
ExchangeRate, ExchangeRateManager
)
class TestExchangeRateManager(AsyncioTestCase):
async def test_exchange_rate_manager(self):
manager = ExchangeRateManager()
manager.start()
self.addCleanup(manager.stop)
for feed in manager.market_feeds:
self.assertFalse(feed.is_online)
self.assertIsNone(feed.rate)
await manager.wait()
for feed in manager.market_feeds:
self.assertTrue(feed.is_online)
self.assertIsInstance(feed.rate, ExchangeRate)
lbc = manager.convert_currency('USD', 'LBC', Decimal('0.01'))
self.assertGreaterEqual(lbc, 0.1)
self.assertLessEqual(lbc, 10.0)

View file

@ -1,206 +1,137 @@
import unittest
import logging
import asyncio
from decimal import Decimal
from time import time
from lbry.schema.claim import Claim
from lbry.extras.daemon import exchange_rate_manager
from lbry.extras.daemon.exchange_rate_manager import (
ExchangeRate, ExchangeRateManager, CurrencyConversionError,
LBRYFeed, LBRYBTCFeed,
CryptonatorFeed, CryptonatorBTCFeed,
BittrexFeed,
)
from torba.testcase import AsyncioTestCase
from lbry.error import InvalidExchangeRateResponseError
from tests import test_utils
class BTCLBCFeed(exchange_rate_manager.MarketFeed):
def __init__(self):
super().__init__(
"BTCLBC",
"market name",
"derp.com",
None,
0.0
)
class USDBTCFeed(exchange_rate_manager.MarketFeed):
def __init__(self):
super().__init__(
"USDBTC",
"market name",
"derp.com",
None,
0.0
)
class DummyExchangeRateManager(exchange_rate_manager.ExchangeRateManager):
class DummyExchangeRateManager(ExchangeRateManager):
def __init__(self, market_feeds, rates):
self.market_feeds = market_feeds
for feed in self.market_feeds:
feed.rate = exchange_rate_manager.ExchangeRate(
feed.market, rates[feed.market]['spot'], rates[feed.market]['ts'])
feed.last_check = time()
feed.rate = ExchangeRate(feed.market, rates[feed.market], time())
def get_dummy_exchange_rate_manager(time):
rates = {
'BTCLBC': {'spot': 3.0, 'ts': time.time() + 1},
'USDBTC': {'spot': 2.0, 'ts': time.time() + 2}
}
return DummyExchangeRateManager([BTCLBCFeed(), USDBTCFeed()], rates)
def get_dummy_exchange_rate_manager():
return DummyExchangeRateManager(
[LBRYFeed(), LBRYBTCFeed()],
{'BTCLBC': 3.0, 'USDBTC': 2.0}
)
class ExchangeRateTest(unittest.TestCase):
def setUp(self):
test_utils.reset_time(self)
class ExchangeRateTests(AsyncioTestCase):
def test_invalid_rates(self):
with self.assertRaises(ValueError):
exchange_rate_manager.ExchangeRate('USDBTC', 0, test_utils.DEFAULT_ISO_TIME)
ExchangeRate('USDBTC', 0, time())
with self.assertRaises(ValueError):
exchange_rate_manager.ExchangeRate('USDBTC', -1, test_utils.DEFAULT_ISO_TIME)
class FeeTest(unittest.TestCase):
def setUp(self):
test_utils.reset_time(self)
ExchangeRate('USDBTC', -1, time())
def test_fee_converts_to_lbc(self):
fee = Claim().stream.fee
fee.usd = Decimal(10.0)
fee.address = "bRcHraa8bYJZL7vkh5sNmGwPDERFUjGPP9"
rates = {
'BTCLBC': {'spot': 3.0, 'ts': test_utils.DEFAULT_ISO_TIME + 1},
'USDBTC': {'spot': 2.0, 'ts': test_utils.DEFAULT_ISO_TIME + 2}
}
market_feeds = [BTCLBCFeed(), USDBTCFeed()]
manager = DummyExchangeRateManager(market_feeds, rates)
manager = get_dummy_exchange_rate_manager()
result = manager.convert_currency(fee.currency, "LBC", fee.amount)
self.assertEqual(60.0, result)
def test_missing_feed(self):
# test when a feed is missing for conversion
fee = Claim().stream.fee
fee.usd = Decimal(1.0)
fee.address = "bRcHraa8bYJZL7vkh5sNmGwPDERFUjGPP9"
rates = {
'BTCLBC': {'spot': 1.0, 'ts': test_utils.DEFAULT_ISO_TIME + 1},
}
market_feeds = [BTCLBCFeed()]
manager = DummyExchangeRateManager(market_feeds, rates)
with self.assertRaises(Exception):
manager = DummyExchangeRateManager([LBRYFeed()], {'BTCLBC': 1.0})
with self.assertRaises(CurrencyConversionError):
manager.convert_currency(fee.currency, "LBC", fee.amount)
class LBRYioFeedTest(unittest.TestCase):
def test_handle_response(self):
feed = exchange_rate_manager.LBRYioFeed()
response = {
def test_lbry_feed_response(self):
feed = LBRYFeed()
out = feed.get_rate_from_response({
'data': {
'fresh': 0, 'lbc_usd': 0.05863062523378918, 'lbc_btc': 5.065289549855739e-05, 'btc_usd': 1157.498
'fresh': 0, 'lbc_usd': 0.05863062523378918,
'lbc_btc': 5.065289549855739e-05, 'btc_usd': 1157.498
},
'success': True,
'error': None
}
out = feed._handle_response(response)
expected = 1.0 / 5.065289549855739e-05
self.assertEqual(expected, out)
response = {}
})
self.assertEqual(1.0 / 5.065289549855739e-05, out)
with self.assertRaises(InvalidExchangeRateResponseError):
feed._handle_response(response)
response = {
"success": True,
"result": []
}
feed.get_rate_from_response({})
with self.assertRaises(InvalidExchangeRateResponseError):
feed._handle_response(response)
feed.get_rate_from_response({
"success": True,
"result": []
})
class TestExchangeRateFeeds(unittest.TestCase):
def test_handle_lbryio_btc_response(self):
feed = exchange_rate_manager.LBRYioBTCFeed()
response = {
def test_lbry_btc_feed_response(self):
feed = LBRYBTCFeed()
out = feed.get_rate_from_response({
'data': {
'fresh': 0, 'lbc_usd': 0.05863062523378918, 'lbc_btc': 5.065289549855739e-05, 'btc_usd': 1157.498
'fresh': 0, 'lbc_usd': 0.05863062523378918,
'lbc_btc': 5.065289549855739e-05, 'btc_usd': 1157.498
},
'success': True,
'error': None
}
out = feed._handle_response(response)
expected = 1.0 / 1157.498
self.assertEqual(expected, out)
response = {}
})
self.assertEqual(1.0 / 1157.498, out)
with self.assertRaises(InvalidExchangeRateResponseError):
out = feed._handle_response(response)
response = {
"success": True,
"result": {}
}
feed.get_rate_from_response({})
with self.assertRaises(InvalidExchangeRateResponseError):
out = feed._handle_response(response)
feed.get_rate_from_response({
"success": True,
"result": {}
})
def test_handle_cryptonator_lbc_response(self):
feed = exchange_rate_manager.CryptonatorFeed()
response = {
def test_cryptonator_lbc_feed_response(self):
feed = CryptonatorFeed()
out = feed.get_rate_from_response({
'ticker': {
'base': 'BTC', 'target': 'LBC', 'price': 23657.44026496, 'volume': '', 'change': -5.59806916,
'base': 'BTC', 'target': 'LBC', 'price': 23657.44026496,
'volume': '', 'change': -5.59806916,
},
'timestamp': 1507470422,
'success': True,
'error': ""
}
out = feed._handle_response(response)
expected = 23657.44026496
self.assertEqual(expected, out)
response = {}
})
self.assertEqual(23_657.44026496, out)
with self.assertRaises(InvalidExchangeRateResponseError):
feed._handle_response(response)
response = {
"success": True,
"ticker": {}
}
feed.get_rate_from_response({})
with self.assertRaises(InvalidExchangeRateResponseError):
feed._handle_response(response)
feed.get_rate_from_response({
"success": True,
"ticker": {}
})
def test_handle_cryptonator_btc_response(self):
feed = exchange_rate_manager.CryptonatorBTCFeed()
response = {
def test_cryptonator_btc_feed_response(self):
feed = CryptonatorBTCFeed()
out = feed.get_rate_from_response({
'ticker': {
'base': 'BTC', 'target': 'LBC', 'price': 0.00022123, 'volume': '', 'change': -0.00000259,
'base': 'BTC', 'target': 'LBC', 'price': 0.00022123,
'volume': '', 'change': -0.00000259,
},
'timestamp': 1507471141,
'success': True,
'error': ''
}
out = feed._handle_response(response)
expected = 0.00022123
self.assertEqual(expected, out)
response = '{}'
})
self.assertEqual(0.00022123, out)
with self.assertRaises(InvalidExchangeRateResponseError):
feed._handle_response(response)
response = {
"success": True,
"ticker": {}
}
feed.get_rate_from_response({})
with self.assertRaises(InvalidExchangeRateResponseError):
feed._handle_response(response)
feed.get_rate_from_response({
"success": True,
"ticker": {}
})
def test_handle_bittrex_response(self):
feed = exchange_rate_manager.BittrexFeed()
response = {
def test_bittrex_feed_response(self):
feed = BittrexFeed()
out = feed.get_rate_from_response({
"success": True,
"message": "",
"result": [
@ -213,102 +144,27 @@ class TestExchangeRateFeeds(unittest.TestCase):
"Price": 0.00001592, "Total": 0.00686151, "FillType": "PARTIAL_FILL", "OrderType": "SELL"
}
]
}
out = feed._handle_response(response)
expected = 1.0 / ((0.00090980+0.00686151) / (56.12611239+430.99988180))
self.assertEqual(expected, out)
response = {}
})
self.assertEqual(1.0 / ((0.00090980+0.00686151) / (56.12611239+430.99988180)), out)
with self.assertRaises(InvalidExchangeRateResponseError):
feed._handle_response(response)
response = {
"success": True,
"result": []
}
feed.get_rate_from_response({})
with self.assertRaises(InvalidExchangeRateResponseError):
feed._handle_response(response)
feed.get_rate_from_response({
"success": True,
"result": []
})
class TestMarketFeed(AsyncioTestCase):
def setUp(self):
self.feed = exchange_rate_manager.MarketFeed('some market', 'some name', 'some url', {'param': 1}, 0.005)
class BadMarketFeed(LBRYFeed):
def test_save_price(self):
with self.assertLogs('lbry.extras.daemon.exchange_rate_manager', logging.DEBUG) as cm:
self.feed._save_price(1)
self.assertIn(self.feed.market, ' '.join(cm.output))
self.assertIn(self.feed.name, ' '.join(cm.output))
self.assertTrue(self.feed.is_online())
self.assertIsNotNone(self.feed.rate)
with self.assertRaises(ValueError):
self.feed._save_price(0)
with self.assertRaises(TypeError):
self.feed._save_price('not a price')
async def test_update_price(self):
def mock_handle_response(json_obj):
return json_obj['data']['lbc_btc']
async def get_response_body_mock(self):
return '{\"data\": {\"fresh\": 0, \"lbc_usd\": 0.05863062523378918, ' \
'\"lbc_btc\": 5.065289549855739e-05, \"btc_usd\": 1157.498}, ' \
'\"success\": true, \"error\": null}'
self.feed._handle_response = mock_handle_response
with unittest.mock.patch.object(
exchange_rate_manager.AioHttpManager, 'get_response_body', get_response_body_mock
):
await self.feed._update_price()
self.assertEqual(self.feed.rate.spot, 5.090743266186672e-05)
self.assertTrue(self.feed.is_online())
async def get_response_body_mock(self):
return '<h1>not a json</h1>'
with unittest.mock.patch.object(
exchange_rate_manager.AioHttpManager, 'get_response_body', get_response_body_mock
), self.assertRaises(ValueError):
await self.feed._update_price()
def get_response(self):
raise InvalidExchangeRateResponseError(self.name, 'bad stuff')
class TestDeserializer(unittest.TestCase):
def test_valid_json(self):
deserializer = exchange_rate_manager.Deserializer('json')
body = '{"data": "valid json", "some_float": 3.1415, "and_a_dict": {"value": true}}'
json_obj = deserializer.deserialize(body)
self.assertEqual(json_obj['data'], 'valid json')
self.assertEqual(json_obj['some_float'], 3.1415)
self.assertTrue(json_obj['and_a_dict']['value'])
class ExchangeRateManagerTests(AsyncioTestCase):
def test_invalid_json(self):
def assert_raises_error(body):
with self.assertRaises(ValueError):
deserializer.deserialize(body)
deserializer = exchange_rate_manager.Deserializer('json')
assert_raises_error('<h1>not a json</h1>')
assert_raises_error('')
assert_raises_error('{')
def test_invalid_content_type(self):
with self.assertRaises(ValueError):
exchange_rate_manager.Deserializer('not a format')
class TestAioHttpManager(AsyncioTestCase):
async def test_get_response_body(self):
async def make_request_mock(self):
response = unittest.mock.Mock(
headers={'Content-Type': 'jibberish'}
)
return response
manager = exchange_rate_manager.AioHttpManager('some url', 'some params', 'json')
with unittest.mock.patch.object(
exchange_rate_manager.AioHttpManager, '_make_request', make_request_mock
), self.assertRaises(InvalidExchangeRateResponseError):
await manager.get_response_body()
async def test_get_rate_failure_retrieved(self):
manager = ExchangeRateManager([BadMarketFeed])
manager.start()
await asyncio.sleep(1)
self.addCleanup(manager.stop)

View file

@ -140,7 +140,7 @@ class TestStreamManager(BlobExchangeTestBase):
AnalyticsManager(self.client_config,
binascii.hexlify(generate_id()).decode(),
binascii.hexlify(generate_id()).decode()))
self.exchange_rate_manager = get_dummy_exchange_rate_manager(time)
self.exchange_rate_manager = get_dummy_exchange_rate_manager()
async def _test_time_to_first_bytes(self, check_post, error=None, after_setup=None):
await self.setup_stream_manager()