Merge pull request #1794 from lbryio/async-exchange-rate-manager

refactor exchange rate manager to use asyncio
This commit is contained in:
Jack Robison 2019-01-22 12:05:07 -05:00 committed by GitHub
commit 6fe9c5bf00
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 60 additions and 73 deletions

View file

@ -22,7 +22,7 @@ from lbrynet.blob.client.EncryptedFileDownloader import EncryptedFileSaverFactor
from lbrynet.blob.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.dht.node import Node
from lbrynet.extras.daemon.Component import Component
from lbrynet.extras.daemon.ExchangeRateManager import ExchangeRateManager
from lbrynet.extras.daemon.exchange_rate_manager import ExchangeRateManager
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.extras.daemon.HashAnnouncer import DHTHashAnnouncer
from lbrynet.extras.reflector.server.server import ReflectorServerFactory

View file

@ -1,11 +1,9 @@
import asyncio
import aiohttp
import time
import logging
import json
import treq
from twisted.internet import defer
from twisted.internet.task import LoopingCall
from lbrynet.p2p.Error import InvalidExchangeRateResponse, CurrencyConversionError
log = logging.getLogger(__name__)
@ -38,14 +36,14 @@ class MarketFeed:
REQUESTS_TIMEOUT = 20
EXCHANGE_RATE_UPDATE_RATE_SEC = 300
def __init__(self, market, name, url, params, fee):
def __init__(self, market: str, name: str, url: str, params, fee):
self.market = market
self.name = name
self.url = url
self.params = params
self.fee = fee
self.rate = None
self._updater = LoopingCall(self._update_price)
self._task: asyncio.Task = None
self._online = True
def rate_is_initialized(self):
@ -54,17 +52,16 @@ class MarketFeed:
def is_online(self):
return self._online
@defer.inlineCallbacks
def _make_request(self):
response = yield treq.get(self.url, params=self.params, timeout=self.REQUESTS_TIMEOUT)
defer.returnValue((yield response.content()))
async def _make_request(self):
async with aiohttp.request('get', self.url, params=self.params) as response:
return (await response.read()).decode()
def _handle_response(self, response):
return NotImplementedError
raise NotImplementedError()
def _subtract_fee(self, from_amount):
# increase amount to account for market fees
return defer.succeed(from_amount / (1.0 - self.fee))
return from_amount / (1.0 - self.fee)
def _save_price(self, price):
log.debug("Saving price update %f for %s from %s" % (price, self.market, self.name))
@ -77,21 +74,23 @@ class MarketFeed:
log.debug("Exchange rate error (%s from %s): %s", self.market, self.name, err)
self._online = False
def _update_price(self):
d = self._make_request()
d.addCallback(self._handle_response)
d.addCallback(self._subtract_fee)
d.addCallback(self._save_price)
d.addErrback(self._on_error)
return d
async def _update_price(self):
while True:
try:
response = await asyncio.wait_for(self._make_request(), self.REQUESTS_TIMEOUT)
self._save_price(self._subtract_fee(self._handle_response(response)))
except (asyncio.TimeoutError, InvalidExchangeRateResponse) as err:
self._on_error(err)
await asyncio.sleep(self.EXCHANGE_RATE_UPDATE_RATE_SEC)
def start(self):
if not self._updater.running:
self._updater.start(self.EXCHANGE_RATE_UPDATE_RATE_SEC)
if not self._task:
self._task = asyncio.create_task(self._update_price())
def stop(self):
if self._updater.running:
self._updater.stop()
if self._task and not self._task.done():
self._task.cancel()
self._task = None
class BittrexFeed(MarketFeed):
@ -116,7 +115,7 @@ class BittrexFeed(MarketFeed):
if totals <= 0 or qtys <= 0:
raise InvalidExchangeRateResponse(self.market, 'quantities were not positive')
vwap = totals / qtys
return defer.succeed(float(1.0 / vwap))
return float(1.0 / vwap)
class LBRYioFeed(MarketFeed):
@ -133,7 +132,7 @@ class LBRYioFeed(MarketFeed):
json_response = json.loads(response)
if 'data' not in json_response:
raise InvalidExchangeRateResponse(self.name, 'result not found')
return defer.succeed(1.0 / json_response['data']['lbc_btc'])
return 1.0 / json_response['data']['lbc_btc']
class LBRYioBTCFeed(MarketFeed):
@ -153,7 +152,7 @@ class LBRYioBTCFeed(MarketFeed):
raise InvalidExchangeRateResponse(self.name, "invalid rate response : %s" % response)
if 'data' not in json_response:
raise InvalidExchangeRateResponse(self.name, 'result not found')
return defer.succeed(1.0 / json_response['data']['btc_usd'])
return 1.0 / json_response['data']['btc_usd']
class CryptonatorBTCFeed(MarketFeed):
@ -174,7 +173,7 @@ class CryptonatorBTCFeed(MarketFeed):
if 'ticker' not in json_response or len(json_response['ticker']) == 0 or \
'success' not in json_response or json_response['success'] is not True:
raise InvalidExchangeRateResponse(self.name, 'result not found')
return defer.succeed(float(json_response['ticker']['price']))
return float(json_response['ticker']['price'])
class CryptonatorFeed(MarketFeed):
@ -195,7 +194,7 @@ class CryptonatorFeed(MarketFeed):
if 'ticker' not in json_response or len(json_response['ticker']) == 0 or \
'success' not in json_response or json_response['success'] is not True:
raise InvalidExchangeRateResponse(self.name, 'result not found')
return defer.succeed(float(json_response['ticker']['price']))
return float(json_response['ticker']['price'])
class ExchangeRateManager:

View file

@ -14,7 +14,7 @@ from lbrynet.p2p.Error import RequestCanceledError
from lbrynet.p2p import BlobAvailability
from lbrynet.blob.EncryptedFileManager import EncryptedFileManager
from lbrynet.dht.node import Node as RealNode
from lbrynet.extras.daemon import ExchangeRateManager as ERM
from lbrynet.extras.daemon import exchange_rate_manager as ERM
KB = 2**10
PUBLIC_EXPONENT = 65537 # http://www.daemonology.net/blog/2009-06-11-cryptographic-right-answers.html

View file

@ -11,7 +11,7 @@ from lbrynet.p2p.BlobManager import DiskBlobManager
from lbrynet.p2p.RateLimiter import DummyRateLimiter
from lbrynet.p2p.client.DownloadManager import DownloadManager
from lbrynet.extras.daemon import Downloader
from lbrynet.extras.daemon import ExchangeRateManager
from lbrynet.extras.daemon.exchange_rate_manager import ExchangeRateManager
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.extras.daemon.PeerFinder import DummyPeerFinder
from lbrynet.blob.EncryptedFileStatusReport import EncryptedFileStatusReport

View file

@ -1,8 +1,7 @@
import unittest
from lbrynet.schema.fee import Fee
from lbrynet.extras.daemon import ExchangeRateManager
from lbrynet.extras.daemon import exchange_rate_manager
from lbrynet.p2p.Error import InvalidExchangeRateResponse
from twisted.trial import unittest
from twisted.internet import defer
from tests import test_utils
from tests.mocks import ExchangeRateManager as DummyExchangeRateManager
from tests.mocks import BTCLBCFeed, USDBTCFeed
@ -36,9 +35,9 @@ class ExchangeRateTest(unittest.TestCase):
def test_invalid_rates(self):
with self.assertRaises(ValueError):
ExchangeRateManager.ExchangeRate('USDBTC', 0, test_utils.DEFAULT_ISO_TIME)
exchange_rate_manager.ExchangeRate('USDBTC', 0, test_utils.DEFAULT_ISO_TIME)
with self.assertRaises(ValueError):
ExchangeRateManager.ExchangeRate('USDBTC', -1, test_utils.DEFAULT_ISO_TIME)
exchange_rate_manager.ExchangeRate('USDBTC', -1, test_utils.DEFAULT_ISO_TIME)
class FeeTest(unittest.TestCase):
@ -65,7 +64,7 @@ class FeeTest(unittest.TestCase):
def test_missing_feed(self):
# test when a feed is missing for conversion
fee = Fee({
'currency':'USD',
'currency': 'USD',
'amount': 1.0,
'address': "bRcHraa8bYJZL7vkh5sNmGwPDERFUjGPP9"
})
@ -80,107 +79,96 @@ class FeeTest(unittest.TestCase):
class LBRYioFeedTest(unittest.TestCase):
@defer.inlineCallbacks
def test_handle_response(self):
feed = ExchangeRateManager.LBRYioFeed()
feed = exchange_rate_manager.LBRYioFeed()
response = '{\"data\": {\"fresh\": 0, \"lbc_usd\": 0.05863062523378918, ' \
'\"lbc_btc\": 5.065289549855739e-05, \"btc_usd\": 1157.498}, ' \
'\"success\": true, \"error\": null}'
out = yield feed._handle_response(response)
out = feed._handle_response(response)
expected = 1.0 / 5.065289549855739e-05
self.assertEqual(expected, out)
response = '{}'
with self.assertRaises(InvalidExchangeRateResponse):
out = yield feed._handle_response(response)
out = feed._handle_response(response)
response = '{"success":true,"result":[]}'
with self.assertRaises(InvalidExchangeRateResponse):
out = yield feed._handle_response(response)
out = feed._handle_response(response)
class LBRYioBTCFeedTest(unittest.TestCase):
@defer.inlineCallbacks
def test_handle_response(self):
feed = ExchangeRateManager.LBRYioBTCFeed()
class TestExchangeRateFeeds(unittest.TestCase):
def test_handle_lbryio_btc_response(self):
feed = exchange_rate_manager.LBRYioBTCFeed()
response = '{\"data\": {\"fresh\": 0, \"lbc_usd\": 0.05863062523378918, ' \
'\"lbc_btc\": 5.065289549855739e-05, \"btc_usd\": 1157.498}, ' \
'\"success\": true, \"error\": null}'
out = yield feed._handle_response(response)
out = feed._handle_response(response)
expected = 1.0 / 1157.498
self.assertEqual(expected, out)
response = '{}'
with self.assertRaises(InvalidExchangeRateResponse):
out = yield feed._handle_response(response)
out = feed._handle_response(response)
response = '{"success":true,"result":[]}'
with self.assertRaises(InvalidExchangeRateResponse):
out = yield feed._handle_response(response)
out = feed._handle_response(response)
class CryptonatorFeedTest(unittest.TestCase):
@defer.inlineCallbacks
def test_handle_response(self):
feed = ExchangeRateManager.CryptonatorFeed()
def test_handle_cryptonator_lbc_response(self):
feed = exchange_rate_manager.CryptonatorFeed()
response = '{\"ticker\":{\"base\":\"BTC\",\"target\":\"LBC\",\"price\":\"23657.44026496\"' \
',\"volume\":\"\",\"change\":\"-5.59806916\"},\"timestamp\":1507470422' \
',\"success\":true,\"error\":\"\"}'
out = yield feed._handle_response(response)
out = feed._handle_response(response)
expected = 23657.44026496
self.assertEqual(expected, out)
response = '{}'
with self.assertRaises(InvalidExchangeRateResponse):
out = yield feed._handle_response(response)
out = feed._handle_response(response)
response = '{"success":true,"ticker":{}}'
with self.assertRaises(InvalidExchangeRateResponse):
out = yield feed._handle_response(response)
out = feed._handle_response(response)
class CryptonatorBTCFeedTest(unittest.TestCase):
@defer.inlineCallbacks
def test_handle_response(self):
feed = ExchangeRateManager.CryptonatorBTCFeed()
def test_handle_cryptonator_btc_response(self):
feed = exchange_rate_manager.CryptonatorBTCFeed()
response = '{\"ticker\":{\"base\":\"USD\",\"target\":\"BTC\",\"price\":\"0.00022123\",' \
'\"volume\":\"\",\"change\":\"-0.00000259\"},\"timestamp\":1507471141,' \
'\"success\":true,\"error\":\"\"}'
out = yield feed._handle_response(response)
out = feed._handle_response(response)
expected = 0.00022123
self.assertEqual(expected, out)
response = '{}'
with self.assertRaises(InvalidExchangeRateResponse):
out = yield feed._handle_response(response)
out = feed._handle_response(response)
response = '{"success":true,"ticker":{}}'
with self.assertRaises(InvalidExchangeRateResponse):
out = yield feed._handle_response(response)
out = feed._handle_response(response)
class BittrexFeedTest(unittest.TestCase):
@defer.inlineCallbacks
def test_handle_response(self):
feed = ExchangeRateManager.BittrexFeed()
def test_handle_bittrex_response(self):
feed = exchange_rate_manager.BittrexFeed()
response = '{"success":true,"message":"","result":[{"Id":6902471,"TimeStamp":"2017-02-2'\
'7T23:41:52.213","Quantity":56.12611239,"Price":0.00001621,"Total":0.00090980,"FillType":"'\
'PARTIAL_FILL","OrderType":"SELL"},{"Id":6902403,"TimeStamp":"2017-02-27T23:31:40.463","Qu'\
'antity":430.99988180,"Price":0.00001592,"Total":0.00686151,"FillType":"PARTIAL_FILL","Ord'\
'erType":"SELL"}]}'
out = yield feed._handle_response(response)
out = feed._handle_response(response)
expected = 1.0 / ((0.00090980+0.00686151) / (56.12611239+430.99988180))
self.assertEqual(expected, out)
response = '{}'
with self.assertRaises(InvalidExchangeRateResponse):
out = yield feed._handle_response(response)
out = feed._handle_response(response)
response = '{"success":true,"result":[]}'
with self.assertRaises(InvalidExchangeRateResponse):
out = yield feed._handle_response(response)
out = feed._handle_response(response)