Merge branch 'master' into wallet_list_unspent

This commit is contained in:
Alex Grin 2017-11-01 17:35:31 -04:00 committed by GitHub
commit fcfde63720
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 176 additions and 110 deletions

View file

@ -22,15 +22,16 @@ at anytime.
*
### Changed
*
* Moved BLOB_SIZE from conf.py to MAX_BLOB_SIZE in blob/blob_file.py
*
### Added
* Added `wallet_list_unspent` command
* Added redundant API server for currency conversion
*
### Removed
*
* Removed some alternate methods of reading from blob files
*

View file

@ -1,6 +1,6 @@
import logging
__version__ = "0.17.2rc2"
__version__ = "0.17.2rc3"
version = tuple(__version__.split('.'))
logging.getLogger(__name__).addHandler(logging.NullHandler())

View file

@ -1,18 +1,16 @@
import logging
import os
from twisted.internet import defer, threads
from twisted.protocols.basic import FileSender
from twisted.web.client import FileBodyProducer
from twisted.python.failure import Failure
from lbrynet import conf
from lbrynet.core.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError
from lbrynet.core.utils import is_valid_blobhash
from lbrynet.blob.writer import HashBlobWriter
from lbrynet.blob.reader import HashBlobReader, HashBlobReader_v0
from lbrynet.blob.reader import HashBlobReader
log = logging.getLogger(__name__)
MAX_BLOB_SIZE = 2 * 2 ** 20
class BlobFile(object):
"""
@ -78,7 +76,8 @@ class BlobFile(object):
finished
"""
if self._verified is True:
reader = HashBlobReader(self.file_path, self.reader_finished)
f = open(self.file_path, 'rb')
reader = HashBlobReader(f, self.reader_finished)
self.readers += 1
return reader
return None
@ -124,7 +123,7 @@ class BlobFile(object):
def set_length(self, length):
if self.length is not None and length == self.length:
return True
if self.length is None and 0 <= length <= conf.settings['BLOB_SIZE']:
if self.length is None and 0 <= length <= MAX_BLOB_SIZE:
self.length = length
return True
log.warning("Got an invalid length. Previous length: %s, Invalid length: %s",
@ -142,33 +141,6 @@ class BlobFile(object):
return True
return False
def read(self, write_func):
"""
This function is only used in StreamBlobDecryptor
and should be deprecated in favor of open_for_reading()
"""
def close_self(*args):
self.close_read_handle(file_handle)
return args[0]
file_sender = FileSender()
reader = HashBlobReader_v0(write_func)
file_handle = self.open_for_reading()
if file_handle is not None:
d = file_sender.beginFileTransfer(file_handle, reader)
d.addCallback(close_self)
else:
d = defer.fail(IOError("Could not read the blob"))
return d
def close_read_handle(self, file_handle):
"""
This function is only used in StreamBlobDecryptor
and should be deprecated in favor of open_for_reading()
"""
if file_handle is not None:
file_handle.close()
def reader_finished(self, reader):
self.readers -= 1
return defer.succeed(True)

View file

@ -1,50 +1,22 @@
import logging
from twisted.internet import interfaces
from zope.interface import implements
log = logging.getLogger(__name__)
class HashBlobReader_v0(object):
"""
This is a class that is only used in StreamBlobDecryptor
and should be deprecated
"""
implements(interfaces.IConsumer)
def __init__(self, write_func):
self.write_func = write_func
def registerProducer(self, producer, streaming):
from twisted.internet import reactor
self.producer = producer
self.streaming = streaming
if self.streaming is False:
reactor.callLater(0, self.producer.resumeProducing)
def unregisterProducer(self):
pass
def write(self, data):
from twisted.internet import reactor
self.write_func(data)
if self.streaming is False:
reactor.callLater(0, self.producer.resumeProducing)
class HashBlobReader(object):
"""
This is a file like reader class that supports
read(size) and close()
"""
def __init__(self, file_path, finished_cb):
def __init__(self, read_handle, finished_cb):
self.finished_cb = finished_cb
self.finished_cb_d = None
self.read_handle = open(file_path, 'rb')
self.read_handle = read_handle
def __del__(self):
if self.finished_cb_d is None:
log.warn("Garbage collection was called, but reader for %s was not closed yet",
self.read_handle.name)
self.close()
def read(self, size=-1):

View file

@ -16,6 +16,11 @@ class HashBlobWriter(object):
self._hashsum = get_lbry_hash_obj()
self.len_so_far = 0
def __del__(self):
if self.finished_cb_d is None:
log.warn("Garbage collection was called, but writer was not closed yet")
self.close()
@property
def blob_hash(self):
return self._hashsum.hexdigest()

View file

@ -206,7 +206,6 @@ FIXED_SETTINGS = {
'API_ADDRESS': 'lbryapi',
'APP_NAME': APP_NAME,
'BLOBFILES_DIR': 'blobfiles',
'BLOB_SIZE': 2 * MB,
'CRYPTSD_FILE_EXTENSION': '.cryptsd',
'CURRENCIES': {
'BTC': {'type': 'crypto'},

View file

@ -1,5 +1,4 @@
from lbrynet import conf
from lbrynet.blob.blob_file import MAX_BLOB_SIZE
class ClientRequest(object):
def __init__(self, request_dict, response_identifier=None):
@ -17,7 +16,7 @@ class ClientBlobRequest(ClientPaidRequest):
def __init__(self, request_dict, response_identifier, write_func, finished_deferred,
cancel_func, blob):
if blob.length is None:
max_pay_units = conf.settings['BLOB_SIZE']
max_pay_units = MAX_BLOB_SIZE
else:
max_pay_units = blob.length
ClientPaidRequest.__init__(self, request_dict, response_identifier, max_pay_units)

View file

@ -1,13 +1,14 @@
import binascii
import logging
from io import BytesIO
from twisted.internet import defer
from twisted.web.client import FileBodyProducer
from cryptography.hazmat.primitives.ciphers import Cipher, modes
from cryptography.hazmat.primitives.ciphers.algorithms import AES
from cryptography.hazmat.primitives.padding import PKCS7
from cryptography.hazmat.backends import default_backend
from lbrynet import conf
from lbrynet.core.BlobInfo import BlobInfo
from lbrynet.blob.blob_file import MAX_BLOB_SIZE
log = logging.getLogger(__name__)
backend = default_backend()
@ -46,6 +47,10 @@ class StreamBlobDecryptor(object):
write_func - function that takes decrypted string as
arugment and writes it somewhere
Returns:
deferred that returns after decrypting blob and writing content
"""
def remove_padding(data):
@ -67,13 +72,20 @@ class StreamBlobDecryptor(object):
last_chunk = self.cipher.update(data_to_decrypt) + self.cipher.finalize()
write_func(remove_padding(last_chunk))
def decrypt_bytes(data):
self.buff += data
self.len_read += len(data)
write_bytes()
d = self.blob.read(decrypt_bytes)
d.addCallback(lambda _: finish_decrypt())
read_handle = self.blob.open_for_reading()
@defer.inlineCallbacks
def decrypt_bytes():
producer = FileBodyProducer(read_handle)
buff = BytesIO()
yield producer.startProducing(buff)
self.buff = buff.getvalue()
self.len_read += len(self.buff)
write_bytes()
finish_decrypt()
d = decrypt_bytes()
return d
@ -106,7 +118,7 @@ class CryptStreamBlobMaker(object):
max bytes are written. num_bytes_to_write is the number
of bytes that will be written from data in this call
"""
max_bytes_to_write = conf.settings['BLOB_SIZE'] - self.length - 1
max_bytes_to_write = MAX_BLOB_SIZE - self.length - 1
done = False
if max_bytes_to_write <= len(data):
num_bytes_to_write = max_bytes_to_write

View file

@ -5,7 +5,6 @@ import json
from twisted.internet import defer, threads
from twisted.internet.task import LoopingCall
from lbrynet import conf
from lbrynet.core.Error import InvalidExchangeRateResponse
log = logging.getLogger(__name__)
@ -45,11 +44,14 @@ class MarketFeed(object):
self.fee = fee
self.rate = None
self._updater = LoopingCall(self._update_price)
self._online = True
@property
def rate_is_initialized(self):
return self.rate is not None
def is_online(self):
return self._online
def _make_request(self):
r = requests.get(self.url, self.params, timeout=self.REQUESTS_TIMEOUT)
return r.text
@ -62,19 +64,22 @@ class MarketFeed(object):
return defer.succeed(from_amount / (1.0 - self.fee))
def _save_price(self, price):
log.debug("Saving price update %f for %s" % (price, self.market))
log.debug("Saving price update %f for %s from %s" % (price, self.market, self.name))
self.rate = ExchangeRate(self.market, price, int(time.time()))
self._online = True
def _log_error(self, err):
log.warning("There was a problem updating %s exchange rate information from %s\n%s",
def _on_error(self, err):
log.warning(
"There was a problem updating %s exchange rate information from %s: %s",
self.market, self.name, err)
self._online = False
def _update_price(self):
d = threads.deferToThread(self._make_request)
d.addCallback(self._handle_response)
d.addCallback(self._subtract_fee)
d.addCallback(self._save_price)
d.addErrback(self._log_error)
d.addErrback(self._on_error)
return d
def start(self):
@ -92,7 +97,7 @@ class BittrexFeed(MarketFeed):
self,
"BTCLBC",
"Bittrex",
conf.settings['bittrex_feed'],
"https://bittrex.com/api/v1.1/public/getmarkethistory",
{'market': 'BTC-LBC', 'count': 50},
BITTREX_FEE
)
@ -151,24 +156,55 @@ class LBRYioBTCFeed(MarketFeed):
return defer.succeed(1.0 / json_response['data']['btc_usd'])
def get_default_market_feed(currency_pair):
currencies = None
if isinstance(currency_pair, str):
currencies = (currency_pair[0:3], currency_pair[3:6])
elif isinstance(currency_pair, tuple):
currencies = currency_pair
assert currencies is not None
class CryptonatorBTCFeed(MarketFeed):
def __init__(self):
MarketFeed.__init__(
self,
"USDBTC",
"cryptonator.com",
"https://api.cryptonator.com/api/ticker/usd-btc",
{},
0.0,
)
if currencies == ("USD", "BTC"):
return LBRYioBTCFeed()
elif currencies == ("BTC", "LBC"):
return LBRYioFeed()
def _handle_response(self, response):
try:
json_response = json.loads(response)
except ValueError:
raise InvalidExchangeRateResponse(self.name, "invalid rate response : %s" % response)
if 'ticker' not in json_response or len(json_response['ticker']) == 0 or \
'success' not in json_response or json_response['success'] is not True:
raise InvalidExchangeRateResponse(self.name, 'result not found')
return defer.succeed(float(json_response['ticker']['price']))
class CryptonatorFeed(MarketFeed):
def __init__(self):
MarketFeed.__init__(
self,
"BTCLBC",
"cryptonator.com",
"https://api.cryptonator.com/api/ticker/btc-lbc",
{},
0.0,
)
def _handle_response(self, response):
try:
json_response = json.loads(response)
except ValueError:
raise InvalidExchangeRateResponse(self.name, "invalid rate response : %s" % response)
if 'ticker' not in json_response or len(json_response['ticker']) == 0 or \
'success' not in json_response or json_response['success'] is not True:
raise InvalidExchangeRateResponse(self.name, 'result not found')
return defer.succeed(float(json_response['ticker']['price']))
class ExchangeRateManager(object):
def __init__(self):
self.market_feeds = [
get_default_market_feed(currency_pair) for currency_pair in CURRENCY_PAIRS]
LBRYioBTCFeed(), LBRYioFeed(), BittrexFeed(), CryptonatorBTCFeed(), CryptonatorFeed()]
def start(self):
log.info("Starting exchange rate manager")
@ -185,12 +221,13 @@ class ExchangeRateManager(object):
log.info("Converting %f %s to %s, rates: %s" % (amount, from_currency, to_currency, rates))
if from_currency == to_currency:
return amount
for market in self.market_feeds:
if (market.rate_is_initialized and
if (market.rate_is_initialized() and market.is_online() and
market.rate.currency_pair == (from_currency, to_currency)):
return amount * market.rate.spot
for market in self.market_feeds:
if (market.rate_is_initialized and
if (market.rate_is_initialized() and market.is_online() and
market.rate.currency_pair[0] == from_currency):
return self.convert_currency(
market.rate.currency_pair[1], to_currency, amount * market.rate.spot)

View file

@ -1,7 +1,7 @@
from twisted.trial import unittest
from twisted.internet import defer
from lbrynet.cryptstream import CryptBlob
from lbrynet import conf
from lbrynet.blob.blob_file import MAX_BLOB_SIZE
from lbrynet.tests.mocks import mock_conf_settings
@ -9,6 +9,7 @@ from Crypto import Random
from Crypto.Cipher import AES
import random
import string
import StringIO
class MocBlob(object):
def __init__(self):
@ -19,6 +20,9 @@ class MocBlob(object):
write_func(data)
return defer.succeed(True)
def open_for_reading(self):
return StringIO.StringIO(self.data)
def write(self, data):
self.data += data
@ -53,7 +57,7 @@ class TestCryptBlob(unittest.TestCase):
expected_encrypted_blob_size = ((size_of_data / AES.block_size) + 1) * AES.block_size
self.assertEqual(expected_encrypted_blob_size, len(blob.data))
if size_of_data < conf.settings['BLOB_SIZE']-1:
if size_of_data < MAX_BLOB_SIZE-1:
self.assertFalse(done)
else:
self.assertTrue(done)
@ -64,7 +68,7 @@ class TestCryptBlob(unittest.TestCase):
# decrypt string
decryptor = CryptBlob.StreamBlobDecryptor(blob, key, iv, size_of_data)
decryptor.decrypt(write_func)
yield decryptor.decrypt(write_func)
self.assertEqual(self.data_buf, string_to_encrypt)
@defer.inlineCallbacks

View file

@ -119,3 +119,68 @@ class LBRYioBTCFeedTest(unittest.TestCase):
response = '{"success":true,"result":[]}'
with self.assertRaises(InvalidExchangeRateResponse):
out = yield feed._handle_response(response)
class CryptonatorFeedTest(unittest.TestCase):
@defer.inlineCallbacks
def test_handle_response(self):
feed = ExchangeRateManager.CryptonatorFeed()
response = '{\"ticker\":{\"base\":\"BTC\",\"target\":\"LBC\",\"price\":\"23657.44026496\"' \
',\"volume\":\"\",\"change\":\"-5.59806916\"},\"timestamp\":1507470422' \
',\"success\":true,\"error\":\"\"}'
out = yield feed._handle_response(response)
expected = 23657.44026496
self.assertEqual(expected, out)
response = '{}'
with self.assertRaises(InvalidExchangeRateResponse):
out = yield feed._handle_response(response)
response = '{"success":true,"ticker":{}}'
with self.assertRaises(InvalidExchangeRateResponse):
out = yield feed._handle_response(response)
class CryptonatorBTCFeedTest(unittest.TestCase):
@defer.inlineCallbacks
def test_handle_response(self):
feed = ExchangeRateManager.CryptonatorBTCFeed()
response = '{\"ticker\":{\"base\":\"USD\",\"target\":\"BTC\",\"price\":\"0.00022123\",' \
'\"volume\":\"\",\"change\":\"-0.00000259\"},\"timestamp\":1507471141,' \
'\"success\":true,\"error\":\"\"}'
out = yield feed._handle_response(response)
expected = 0.00022123
self.assertEqual(expected, out)
response = '{}'
with self.assertRaises(InvalidExchangeRateResponse):
out = yield feed._handle_response(response)
response = '{"success":true,"ticker":{}}'
with self.assertRaises(InvalidExchangeRateResponse):
out = yield feed._handle_response(response)
class BittrexFeedTest(unittest.TestCase):
@defer.inlineCallbacks
def test_handle_response(self):
feed = ExchangeRateManager.BittrexFeed()
response = '{"success":true,"message":"","result":[{"Id":6902471,"TimeStamp":"2017-02-2'\
'7T23:41:52.213","Quantity":56.12611239,"Price":0.00001621,"Total":0.00090980,"FillType":"'\
'PARTIAL_FILL","OrderType":"SELL"},{"Id":6902403,"TimeStamp":"2017-02-27T23:31:40.463","Qu'\
'antity":430.99988180,"Price":0.00001592,"Total":0.00686151,"FillType":"PARTIAL_FILL","Ord'\
'erType":"SELL"}]}'
out = yield feed._handle_response(response)
expected = 1.0 / ((0.00090980+0.00686151) / (56.12611239+430.99988180))
self.assertEqual(expected, out)
response = '{}'
with self.assertRaises(InvalidExchangeRateResponse):
out = yield feed._handle_response(response)
response = '{"success":true,"result":[]}'
with self.assertRaises(InvalidExchangeRateResponse):
out = yield feed._handle_response(response)

View file

@ -14,8 +14,8 @@ gmpy==1.17
jsonrpc==1.2
jsonrpclib==0.1.7
jsonschema==2.6.0
git+https://github.com/lbryio/lbryum.git@v3.1.10#egg=lbryum
git+https://github.com/lbryio/lbryschema.git@v0.0.13#egg=lbryschema
git+https://github.com/lbryio/lbryum.git@v3.1.11rc1#egg=lbryum
git+https://github.com/lbryio/lbryschema.git@v0.0.14rc1#egg=lbryschema
miniupnpc==1.9
pbkdf2==1.3
pycrypto==2.6.1

View file

@ -21,8 +21,8 @@ requires = [
'envparse',
'jsonrpc',
'jsonschema',
'lbryum==3.1.10',
'lbryschema==0.0.13',
'lbryum==3.1.11rc1',
'lbryschema==0.0.14rc1',
'miniupnpc',
'pycrypto',
'pyyaml',