Merge branch 'master' into feature/file_list_sorting

This commit is contained in:
Lex Berezhny 2018-05-29 22:47:09 -04:00 committed by GitHub
commit 23df1d0316
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
38 changed files with 595 additions and 451 deletions

View file

@ -25,6 +25,13 @@ at anytime.
*
### Changed
* check headers file integrity on startup, removing/truncating the file to force re-download when necessary
* support partial headers file download from S3
* changed txrequests for treq
* changed cryptography version to 2.2.2
* removed pycrypto dependency, replacing all calls to cryptography
* full verification of streams only during migration instead of every startup
* database batching functions for starting up the file manager
* several internal dht functions to use inlineCallbacks
* `DHTHashAnnouncer` and `Node` manage functions to use `LoopingCall`s instead of scheduling with `callLater`.
* `store` kademlia rpc method to block on the call finishing and to return storing peer information
@ -44,13 +51,19 @@ at anytime.
* regenerate api keys on startup if the using authentication
* support both positional and keyword args for api calls
* `peer_list` to return a list of dictionaries instead of a list of lists, added peer node ids to the results
* download blockchain headers from s3 before starting the wallet when the local height is more than `s3_headers_depth` (a config setting) blocks behind
* track successful reflector uploads in sqlite to minimize how many streams are attempted by auto re-reflect
* increase the default `auto_re_reflect_interval` to a day
* predictable result sorting for `claim_list` and `claim_list_mine`
### Added
* virtual kademlia network and mock udp transport for dht integration tests
* integration tests for bootstrapping the dht
* configurable `concurrent_announcers` setting
* configurable `concurrent_announcers` and `s3_headers_depth` settings
* `peer_ping` command
* `--sort` option in `file_list`
* linux distro and desktop name added to analytics
* certifi module for Twisted SSL verification on Windows
### Removed
* `announce_all` argument from `blob_announce`
@ -58,6 +71,10 @@ at anytime.
* `AuthJSONRPCServer.auth_required` decorator
* unused `--wallet` argument to `lbrynet-daemon`, which used to be to support `PTCWallet`.
## [0.19.3] - 2018-05-04
### Changed
* download blockchain headers from s3 before starting the wallet when the local height is more than s3_headers_depth (a config setting) blocks behind (https://github.com/lbryio/lbry/pull/1177)
* un-deprecated report_bug command (https://github.com/lbryio/lbry/commit/f8e418fb4448a3ed1531657f8b3c608fb568af85)
## [0.19.2] - 2018-03-28
### Fixed

View file

@ -35,7 +35,7 @@ LBRY is a protocol that provides a fully decentralized network for the discovery
It utilizes the [LBRY blockchain](https://github.com/lbryio/lbrycrd) as a global namespace and database of digital content. Blockchain entries contain searchable content metadata, identities, and rights and access rules.
LBRY also provides a data network consists of peers uploading and downloading data from other peers, possibly in exchange for payments, and a distributed hash table, used by peers to discover other peers.
LBRY also provides a data network that consists of peers uploading and downloading data from other peers, possibly in exchange for payments, and a distributed hash table, used by peers to discover other peers.
## Contributions

View file

@ -1,6 +1,6 @@
import logging
__version__ = "0.20.0rc9"
__version__ = "0.20.0rc10"
version = tuple(__version__.split('.'))
logging.getLogger(__name__).addHandler(logging.NullHandler())

View file

@ -1,8 +1,8 @@
import collections
import logging
import treq
from twisted.internet import defer, task
from requests import auth
from txrequests import Session
from lbrynet import conf
from lbrynet.core import looping_call_manager, utils, system_info
@ -185,7 +185,7 @@ class Manager(object):
@staticmethod
def _make_context(platform, wallet):
return {
context = {
'app': {
'name': 'lbrynet',
'version': platform['lbrynet_version'],
@ -206,6 +206,10 @@ class Manager(object):
'version': '1.0.0'
},
}
if 'desktop' in platform and 'distro' in platform:
context['os']['desktop'] = platform['desktop']
context['os']['distro'] = platform['distro']
return context
@staticmethod
def _if_deferred(maybe_deferred, callback, *args, **kwargs):
@ -216,8 +220,8 @@ class Manager(object):
class Api(object):
def __init__(self, session, url, write_key, enabled):
self.session = session
def __init__(self, cookies, url, write_key, enabled):
self.cookies = cookies
self.url = url
self._write_key = write_key
self._enabled = enabled
@ -232,14 +236,17 @@ class Api(object):
# timeout will have expired.
#
# by forcing the connection to close, we will disable the keep-alive.
def update_cookies(response):
self.cookies.update(response.cookies())
return response
assert endpoint[0] == '/'
headers = {"Connection": "close"}
return self.session.post(
self.url + endpoint,
json=data,
auth=auth.HTTPBasicAuth(self._write_key, ''),
headers=headers
)
headers = {b"Connection": b"close"}
d = treq.post(self.url + endpoint, auth=(self._write_key, ''), json=data,
headers=headers, cookies=self.cookies)
d.addCallback(update_cookies)
return d
def track(self, event):
"""Send a single tracking event"""
@ -257,11 +264,10 @@ class Api(object):
@classmethod
def new_instance(cls, enabled=None):
"""Initialize an instance using values from the configuration"""
session = Session()
if enabled is None:
enabled = conf.settings['share_usage_data']
return cls(
session,
{},
conf.settings['ANALYTICS_ENDPOINT'],
utils.deobfuscate(conf.settings['ANALYTICS_TOKEN']),
enabled,

View file

@ -236,6 +236,7 @@ FIXED_SETTINGS = {
'SLACK_WEBHOOK': ('nUE0pUZ6Yl9bo29epl5moTSwnl5wo20ip2IlqzywMKZiIQSFZR5'
'AHx4mY0VmF0WQZ1ESEP9kMHZlp1WzJwWOoKN3ImR1M2yUAaMyqGZ='),
'WALLET_TYPES': [LBRYUM_WALLET, LBRYCRD_WALLET],
'HEADERS_FILE_SHA256_CHECKSUM': (366295, 'b0c8197153a33ccbc52fb81a279588b6015b68b7726f73f6a2b81f7e25bfe4b9')
}
ADJUSTABLE_SETTINGS = {
@ -280,10 +281,10 @@ ADJUSTABLE_SETTINGS = {
'peer_port': (int, 3333),
'pointtrader_server': (str, 'http://127.0.0.1:2424'),
'reflector_port': (int, 5566),
# if reflect_uploads is True, send files to reflector (after publishing as well as a
# periodic check in the event the initial upload failed or was disconnected part way through
# if reflect_uploads is True, send files to reflector after publishing (as well as a periodic check in the
# event the initial upload failed or was disconnected part way through, provided the auto_re_reflect_interval > 0)
'reflect_uploads': (bool, True),
'auto_re_reflect_interval': (int, 3600),
'auto_re_reflect_interval': (int, 86400), # set to 0 to disable
'reflector_servers': (list, [('reflector2.lbry.io', 5566)], server_list),
'run_reflector_server': (bool, False),
'sd_download_timeout': (int, 3),
@ -294,7 +295,8 @@ ADJUSTABLE_SETTINGS = {
'use_keyring': (bool, False),
'wallet': (str, LBRYUM_WALLET),
'blockchain_name': (str, 'lbrycrd_main'),
'lbryum_servers': (list, [('lbryum8.lbry.io', 50001), ('lbryum9.lbry.io', 50001)], server_list)
'lbryum_servers': (list, [('lbryum8.lbry.io', 50001), ('lbryum9.lbry.io', 50001)], server_list),
's3_headers_depth': (int, 96 * 10) # download headers from s3 when the local height is more than 10 chunks behind
}

View file

@ -1,24 +1,30 @@
import os
from collections import defaultdict, deque
import datetime
import logging
from decimal import Decimal
import treq
from zope.interface import implements
from twisted.internet import threads, reactor, defer, task
from twisted.python.failure import Failure
from twisted.internet.error import ConnectionAborted
from hashlib import sha256
from lbryum import wallet as lbryum_wallet
from lbryum.network import Network
from lbryum.simple_config import SimpleConfig
from lbryum.constants import COIN
from lbryum.commands import Commands
from lbryum.errors import InvalidPassword
from lbryum.constants import HEADERS_URL, HEADER_SIZE
from lbryschema.uri import parse_lbry_uri
from lbryschema.claim import ClaimDict
from lbryschema.error import DecodeError
from lbryschema.decode import smart_decode
from lbrynet.txlbryum.factory import StratumClient
from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet
from lbrynet.core.utils import DeferredDict
from lbrynet.core.client.ClientRequest import ClientRequest
@ -86,18 +92,112 @@ class Wallet(object):
self._batch_count = 20
self._pending_claim_checker = task.LoopingCall(self.fetch_and_save_heights_for_pending_claims)
def start(self):
log.info("Starting wallet.")
@defer.inlineCallbacks
def fetch_headers_from_s3(self):
local_header_size = self.local_header_file_size()
resume_header = {"Range": "bytes={}-".format(local_header_size)}
response = yield treq.get(HEADERS_URL, headers=resume_header)
got_406 = response.code == 406 # our file is bigger
final_size_after_download = response.length + local_header_size
if got_406:
log.warning("s3 is more out of date than we are")
# should have something to download and a final length divisible by the header size
elif final_size_after_download and not final_size_after_download % HEADER_SIZE:
s3_height = (final_size_after_download / HEADER_SIZE) - 1
local_height = self.local_header_file_height()
if s3_height > local_height:
if local_header_size:
log.info("Resuming download of %i bytes from s3", response.length)
with open(os.path.join(self.config.path, "blockchain_headers"), "a+b") as headers_file:
yield treq.collect(response, headers_file.write)
else:
with open(os.path.join(self.config.path, "blockchain_headers"), "wb") as headers_file:
yield treq.collect(response, headers_file.write)
log.info("fetched headers from s3 (s3 height: %i), now verifying integrity after download.", s3_height)
self._check_header_file_integrity()
else:
log.warning("s3 is more out of date than we are")
else:
log.error("invalid size for headers from s3")
def start_manage():
def local_header_file_height(self):
return max((self.local_header_file_size() / HEADER_SIZE) - 1, 0)
def local_header_file_size(self):
headers_path = os.path.join(self.config.path, "blockchain_headers")
if os.path.isfile(headers_path):
return os.stat(headers_path).st_size
return 0
@defer.inlineCallbacks
def get_remote_height(self, server, port):
connected = defer.Deferred()
client = StratumClient(connected)
reactor.connectTCP(server, port, client)
yield connected
remote_height = yield client.blockchain_block_get_server_height()
client.client.transport.loseConnection()
defer.returnValue(remote_height)
@defer.inlineCallbacks
def should_download_headers_from_s3(self):
from lbrynet import conf
if conf.settings['blockchain_name'] != "lbrycrd_main":
defer.returnValue(False)
self._check_header_file_integrity()
s3_headers_depth = conf.settings['s3_headers_depth']
if not s3_headers_depth:
defer.returnValue(False)
local_height = self.local_header_file_height()
for server_url in self.config.get('default_servers'):
port = int(self.config.get('default_servers')[server_url]['t'])
try:
remote_height = yield self.get_remote_height(server_url, port)
log.info("%s:%i height: %i, local height: %s", server_url, port, remote_height, local_height)
if remote_height > (local_height + s3_headers_depth):
defer.returnValue(True)
except Exception as err:
log.warning("error requesting remote height from %s:%i - %s", server_url, port, err)
defer.returnValue(False)
def _check_header_file_integrity(self):
# TODO: temporary workaround for usability. move to txlbryum and check headers instead of file integrity
from lbrynet import conf
if conf.settings['blockchain_name'] != "lbrycrd_main":
return
hashsum = sha256()
checksum_height, checksum = conf.settings['HEADERS_FILE_SHA256_CHECKSUM']
checksum_length_in_bytes = checksum_height * HEADER_SIZE
if self.local_header_file_size() < checksum_length_in_bytes:
return
headers_path = os.path.join(self.config.path, "blockchain_headers")
with open(headers_path, "rb") as headers_file:
hashsum.update(headers_file.read(checksum_length_in_bytes))
current_checksum = hashsum.hexdigest()
if current_checksum != checksum:
msg = "Expected checksum {}, got {}".format(checksum, current_checksum)
log.warning("Wallet file corrupted, checksum mismatch. " + msg)
log.warning("Deleting header file so it can be downloaded again.")
os.unlink(headers_path)
elif (self.local_header_file_size() % HEADER_SIZE) != 0:
log.warning("Header file is good up to checkpoint height, but incomplete. Truncating to checkpoint.")
with open(headers_path, "rb+") as headers_file:
headers_file.truncate(checksum_length_in_bytes)
@defer.inlineCallbacks
def start(self):
should_download_headers = yield self.should_download_headers_from_s3()
if should_download_headers:
try:
yield self.fetch_headers_from_s3()
except Exception as err:
log.error("failed to fetch headers from s3: %s", err)
log.info("Starting wallet.")
yield self._start()
self.stopped = False
self.manage()
self._pending_claim_checker.start(30)
return True
d = self._start()
d.addCallback(lambda _: start_manage())
return d
defer.returnValue(True)
@staticmethod
def log_stop_error(err):

View file

@ -6,8 +6,7 @@ import os
import sys
import traceback
from txrequests import Session
from requests.exceptions import ConnectionError
import treq
from twisted.internet import defer
import twisted.python.log
@ -35,13 +34,13 @@ TRACE = 5
class HTTPSHandler(logging.Handler):
def __init__(self, url, fqdn=False, localname=None, facility=None, session=None):
def __init__(self, url, fqdn=False, localname=None, facility=None, cookies=None):
logging.Handler.__init__(self)
self.url = url
self.fqdn = fqdn
self.localname = localname
self.facility = facility
self.session = session if session is not None else Session()
self.cookies = cookies or {}
def get_full_message(self, record):
if record.exc_info:
@ -52,10 +51,8 @@ class HTTPSHandler(logging.Handler):
@defer.inlineCallbacks
def _emit(self, record):
payload = self.format(record)
try:
yield self.session.post(self.url, data=payload)
except ConnectionError:
pass
response = yield treq.post(self.url, data=payload, cookies=self.cookies)
self.cookies.update(response.cookies())
def emit(self, record):
return self._emit(record)

View file

@ -36,8 +36,12 @@ def get_platform(get_ip=True):
"lbryschema_version": lbryschema_version,
"build": build_type.BUILD, # CI server sets this during build step
}
if p["os_system"] == "Linux":
import distro
p["distro"] = distro.info()
p["desktop"] = os.environ.get('XDG_CURRENT_DESKTOP', 'Unknown')
# TODO: remove this from get_platform and add a get_external_ip function using txrequests
# TODO: remove this from get_platform and add a get_external_ip function using treq
if get_ip:
try:
response = json.loads(urlopen("https://api.lbry.io/ip").read())

View file

@ -1,12 +1,12 @@
"""
Utility for creating Crypt Streams, which are encrypted blobs and associated metadata.
"""
import os
import logging
from cryptography.hazmat.primitives.ciphers.algorithms import AES
from twisted.internet import interfaces, defer
from zope.interface import implements
from Crypto import Random
from Crypto.Cipher import AES
from lbrynet.cryptstream.CryptBlob import CryptStreamBlobMaker
@ -101,13 +101,13 @@ class CryptStreamCreator(object):
@staticmethod
def random_iv_generator():
while 1:
yield Random.new().read(AES.block_size)
yield os.urandom(AES.block_size / 8)
def setup(self):
"""Create the symmetric key if it wasn't provided"""
if self.key is None:
self.key = Random.new().read(AES.block_size)
self.key = os.urandom(AES.block_size / 8)
return defer.succeed(True)

View file

@ -166,6 +166,11 @@ class AlwaysSend(object):
return d
def sort_claim_results(claims):
claims.sort(key=lambda d: (d['height'], d['name'], d['claim_id'], d['txid'], d['nout']))
return claims
class Daemon(AuthJSONRPCServer):
"""
LBRYnet daemon, a jsonrpc interface to lbry functions
@ -202,7 +207,7 @@ class Daemon(AuthJSONRPCServer):
self.connected_to_internet = True
self.connection_status_code = None
self.platform = None
self.current_db_revision = 7
self.current_db_revision = 9
self.db_revision_file = conf.settings.get_db_revision_filename()
self.session = None
self._session_id = conf.settings.get_session_id()
@ -2361,7 +2366,8 @@ class Daemon(AuthJSONRPCServer):
}
"""
claims = yield self.session.wallet.get_claims_for_name(name)
claims = yield self.session.wallet.get_claims_for_name(name) # type: dict
sort_claim_results(claims['claims'])
defer.returnValue(claims)
@defer.inlineCallbacks

View file

@ -1,3 +1,12 @@
import os
import sys
# Set SSL_CERT_FILE env variable for Twisted SSL verification on Windows
# This needs to happen before anything else
if 'win' in sys.platform:
import certifi
os.environ['SSL_CERT_FILE'] = certifi.where()
from lbrynet.core import log_support
import argparse

View file

@ -1,8 +1,9 @@
import time
import requests
import logging
import json
from twisted.internet import defer, threads
import treq
from twisted.internet import defer
from twisted.internet.task import LoopingCall
from lbrynet.core.Error import InvalidExchangeRateResponse
@ -52,9 +53,10 @@ class MarketFeed(object):
def is_online(self):
return self._online
@defer.inlineCallbacks
def _make_request(self):
r = requests.get(self.url, self.params, timeout=self.REQUESTS_TIMEOUT)
return r.text
response = yield treq.get(self.url, params=self.params, timeout=self.REQUESTS_TIMEOUT)
defer.returnValue((yield response.content()))
def _handle_response(self, response):
return NotImplementedError
@ -75,7 +77,7 @@ class MarketFeed(object):
self._online = False
def _update_price(self):
d = threads.deferToThread(self._make_request)
d = self._make_request()
d.addCallback(self._handle_response)
d.addCallback(self._subtract_fee)
d.addCallback(self._save_price)

View file

@ -13,7 +13,7 @@ from txjsonrpc import jsonrpclib
from traceback import format_exc
from lbrynet import conf
from lbrynet.core.Error import InvalidAuthenticationToken, InvalidHeaderError
from lbrynet.core.Error import InvalidAuthenticationToken
from lbrynet.core import utils
from lbrynet.daemon.auth.util import APIKey, get_auth_message
from lbrynet.daemon.auth.client import LBRY_SECRET
@ -231,9 +231,9 @@ class AuthJSONRPCServer(AuthorizedBase):
def _render(self, request):
time_in = utils.now()
if not self._check_headers(request):
self._render_error(Failure(InvalidHeaderError()), request, None)
return server.NOT_DONE_YET
# if not self._check_headers(request):
# self._render_error(Failure(InvalidHeaderError()), request, None)
# return server.NOT_DONE_YET
session = request.getSession()
session_id = session.uid
finished_deferred = request.notifyFinish()

View file

@ -16,6 +16,10 @@ def migrate_db(db_dir, start, end):
from lbrynet.database.migrator.migrate5to6 import do_migration
elif current == 6:
from lbrynet.database.migrator.migrate6to7 import do_migration
elif current == 7:
from lbrynet.database.migrator.migrate7to8 import do_migration
elif current == 8:
from lbrynet.database.migrator.migrate8to9 import do_migration
else:
raise Exception("DB migration of version {} to {} is not available".format(current,
current+1))

View file

@ -0,0 +1,21 @@
import sqlite3
import os
def do_migration(db_dir):
db_path = os.path.join(db_dir, "lbrynet.sqlite")
connection = sqlite3.connect(db_path)
cursor = connection.cursor()
cursor.executescript(
"""
create table reflected_stream (
sd_hash text not null,
reflector_address text not null,
timestamp integer,
primary key (sd_hash, reflector_address)
);
"""
)
connection.commit()
connection.close()

View file

@ -0,0 +1,54 @@
import sqlite3
import logging
import os
from lbrynet.core.Error import InvalidStreamDescriptorError
from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, format_sd_info, format_blobs, validate_descriptor
from lbrynet.cryptstream.CryptBlob import CryptBlobInfo
log = logging.getLogger(__name__)
def do_migration(db_dir):
db_path = os.path.join(db_dir, "lbrynet.sqlite")
blob_dir = os.path.join(db_dir, "blobfiles")
connection = sqlite3.connect(db_path)
cursor = connection.cursor()
query = "select stream_name, stream_key, suggested_filename, sd_hash, stream_hash from stream"
streams = cursor.execute(query).fetchall()
blobs = cursor.execute("select s.stream_hash, s.position, s.iv, b.blob_hash, b.blob_length from stream_blob s "
"left outer join blob b ON b.blob_hash=s.blob_hash order by s.position").fetchall()
blobs_by_stream = {}
for stream_hash, position, iv, blob_hash, blob_length in blobs:
blobs_by_stream.setdefault(stream_hash, []).append(CryptBlobInfo(blob_hash, position, blob_length or 0, iv))
for stream_name, stream_key, suggested_filename, sd_hash, stream_hash in streams:
sd_info = format_sd_info(
EncryptedFileStreamType, stream_name, stream_key,
suggested_filename, stream_hash, format_blobs(blobs_by_stream[stream_hash])
)
try:
validate_descriptor(sd_info)
except InvalidStreamDescriptorError as err:
log.warning("Stream for descriptor %s is invalid (%s), cleaning it up",
sd_hash, err.message)
blob_hashes = [blob.blob_hash for blob in blobs_by_stream[stream_hash]]
delete_stream(cursor, stream_hash, sd_hash, blob_hashes, blob_dir)
connection.commit()
connection.close()
def delete_stream(transaction, stream_hash, sd_hash, blob_hashes, blob_dir):
transaction.execute("delete from content_claim where stream_hash=? ", (stream_hash,))
transaction.execute("delete from file where stream_hash=? ", (stream_hash, ))
transaction.execute("delete from stream_blob where stream_hash=?", (stream_hash, ))
transaction.execute("delete from stream where stream_hash=? ", (stream_hash, ))
transaction.execute("delete from blob where blob_hash=?", (sd_hash, ))
for blob_hash in blob_hashes:
transaction.execute("delete from blob where blob_hash=?", (blob_hash, ))
file_path = os.path.join(blob_dir, blob_hash)
if os.path.isfile(file_path):
os.unlink(file_path)

View file

@ -157,6 +157,13 @@ class SQLiteStorage(object):
amount integer not null,
address text not null
);
create table if not exists reflected_stream (
sd_hash text not null,
reflector_address text not null,
timestamp integer,
primary key (sd_hash, reflector_address)
);
"""
def __init__(self, db_dir, reactor=None):
@ -545,7 +552,7 @@ class SQLiteStorage(object):
)
return self.db.runInteraction(_save_support)
def get_supports(self, claim_id):
def get_supports(self, *claim_ids):
def _format_support(outpoint, supported_id, amount, address):
return {
"txid": outpoint.split(":")[0],
@ -556,10 +563,15 @@ class SQLiteStorage(object):
}
def _get_supports(transaction):
if len(claim_ids) == 1:
bind = "=?"
else:
bind = "in ({})".format(','.join('?' for _ in range(len(claim_ids))))
return [
_format_support(*support_info)
for support_info in transaction.execute(
"select * from support where claim_id=?", (claim_id, )
"select * from support where claim_id {}".format(bind),
tuple(claim_ids)
).fetchall()
]
@ -676,51 +688,82 @@ class SQLiteStorage(object):
@defer.inlineCallbacks
def get_content_claim(self, stream_hash, include_supports=True):
def _get_content_claim(transaction):
claim_id = transaction.execute(
"select claim.claim_outpoint from content_claim "
"inner join claim on claim.claim_outpoint=content_claim.claim_outpoint and content_claim.stream_hash=? "
"order by claim.rowid desc", (stream_hash, )
def _get_claim_from_stream_hash(transaction):
claim_info = transaction.execute(
"select c.*, "
"case when c.channel_claim_id is not null then "
"(select claim_name from claim where claim_id==c.channel_claim_id) "
"else null end as channel_name from content_claim "
"inner join claim c on c.claim_outpoint=content_claim.claim_outpoint "
"and content_claim.stream_hash=? order by c.rowid desc", (stream_hash,)
).fetchone()
if not claim_id:
if not claim_info:
return None
return claim_id[0]
channel_name = claim_info[-1]
result = _format_claim_response(*claim_info[:-1])
if channel_name:
result['channel_name'] = channel_name
return result
content_claim_outpoint = yield self.db.runInteraction(_get_content_claim)
result = None
if content_claim_outpoint:
result = yield self.get_claim(content_claim_outpoint, include_supports)
result = yield self.db.runInteraction(_get_claim_from_stream_hash)
if result and include_supports:
supports = yield self.get_supports(result['claim_id'])
result['supports'] = supports
result['effective_amount'] = float(
sum([support['amount'] for support in supports]) + result['amount']
)
defer.returnValue(result)
@defer.inlineCallbacks
def get_claim(self, claim_outpoint, include_supports=True):
def _claim_response(outpoint, claim_id, name, amount, height, serialized, channel_id, address, claim_sequence):
r = {
"name": name,
"claim_id": claim_id,
"address": address,
"claim_sequence": claim_sequence,
"value": ClaimDict.deserialize(serialized.decode('hex')).claim_dict,
"height": height,
"amount": float(Decimal(amount) / Decimal(COIN)),
"nout": int(outpoint.split(":")[1]),
"txid": outpoint.split(":")[0],
"channel_claim_id": channel_id,
"channel_name": None
}
return r
def get_claims_from_stream_hashes(self, stream_hashes, include_supports=True):
def _batch_get_claim(transaction):
results = {}
bind = "({})".format(','.join('?' for _ in range(len(stream_hashes))))
claim_infos = transaction.execute(
"select content_claim.stream_hash, c.*, "
"case when c.channel_claim_id is not null then "
"(select claim_name from claim where claim_id==c.channel_claim_id) "
"else null end as channel_name from content_claim "
"inner join claim c on c.claim_outpoint=content_claim.claim_outpoint "
"and content_claim.stream_hash in {} order by c.rowid desc".format(bind),
tuple(stream_hashes)
).fetchall()
for claim_info in claim_infos:
channel_name = claim_info[-1]
stream_hash = claim_info[0]
result = _format_claim_response(*claim_info[1:-1])
if channel_name:
result['channel_name'] = channel_name
results[stream_hash] = result
return results
claims = yield self.db.runInteraction(_batch_get_claim)
if include_supports:
all_supports = {}
for support in (yield self.get_supports(*[claim['claim_id'] for claim in claims.values()])):
all_supports.setdefault(support['claim_id'], []).append(support)
for stream_hash in claims.keys():
claim = claims[stream_hash]
supports = all_supports.get(claim['claim_id'], [])
claim['supports'] = supports
claim['effective_amount'] = float(
sum([support['amount'] for support in supports]) + claim['amount']
)
claims[stream_hash] = claim
defer.returnValue(claims)
@defer.inlineCallbacks
def get_claim(self, claim_outpoint, include_supports=True):
def _get_claim(transaction):
claim_info = transaction.execute(
"select * from claim where claim_outpoint=?", (claim_outpoint, )
).fetchone()
result = _claim_response(*claim_info)
if result['channel_claim_id']:
channel_name_result = transaction.execute(
"select claim_name from claim where claim_id=?", (result['channel_claim_id'], )
).fetchone()
if channel_name_result:
result['channel_name'] = channel_name_result[0]
claim_info = transaction.execute("select c.*, "
"case when c.channel_claim_id is not null then "
"(select claim_name from claim where claim_id==c.channel_claim_id) "
"else null end as channel_name from claim c where claim_outpoint = ?",
(claim_outpoint,)).fetchone()
channel_name = claim_info[-1]
result = _format_claim_response(*claim_info[:-1])
if channel_name:
result['channel_name'] = channel_name
return result
result = yield self.db.runInteraction(_get_claim)
@ -765,3 +808,42 @@ class SQLiteStorage(object):
(height, outpoint)
)
return self.db.runInteraction(_save_claim_heights)
# # # # # # # # # reflector functions # # # # # # # # #
def update_reflected_stream(self, sd_hash, reflector_address, success=True):
if success:
return self.db.runOperation(
"insert or replace into reflected_stream values (?, ?, ?)",
(sd_hash, reflector_address, self.clock.seconds())
)
return self.db.runOperation(
"delete from reflected_stream where sd_hash=? and reflector_address=?",
(sd_hash, reflector_address)
)
def get_streams_to_re_reflect(self):
return self.run_and_return_list(
"select s.sd_hash from stream s "
"left outer join reflected_stream r on s.sd_hash=r.sd_hash "
"where r.timestamp is null or r.timestamp < ?",
self.clock.seconds() - conf.settings['auto_re_reflect_interval']
)
# Helper functions
def _format_claim_response(outpoint, claim_id, name, amount, height, serialized, channel_id, address, claim_sequence):
r = {
"name": name,
"claim_id": claim_id,
"address": address,
"claim_sequence": claim_sequence,
"value": ClaimDict.deserialize(serialized.decode('hex')).claim_dict,
"height": height,
"amount": float(Decimal(amount) / Decimal(COIN)),
"nout": int(outpoint.split(":")[1]),
"txid": outpoint.split(":")[0],
"channel_claim_id": channel_id,
"channel_name": None
}
return r

View file

@ -56,10 +56,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
self.channel_name = None
self.metadata = None
@defer.inlineCallbacks
def get_claim_info(self, include_supports=True):
claim_info = yield self.storage.get_content_claim(self.stream_hash, include_supports)
if claim_info:
def set_claim_info(self, claim_info):
self.claim_id = claim_info['claim_id']
self.txid = claim_info['txid']
self.nout = claim_info['nout']
@ -69,6 +66,12 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
self.channel_name = claim_info['channel_name']
self.metadata = claim_info['value']['stream']['metadata']
@defer.inlineCallbacks
def get_claim_info(self, include_supports=True):
claim_info = yield self.storage.get_content_claim(self.stream_hash, include_supports)
if claim_info:
self.set_claim_info(claim_info)
defer.returnValue(claim_info)
@property

View file

@ -6,12 +6,11 @@ import logging
from twisted.internet import defer, task, reactor
from twisted.python.failure import Failure
from lbrynet.core.Error import InvalidStreamDescriptorError
from lbrynet.reflector.reupload import reflect_file
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, get_sd_info, validate_descriptor
from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, get_sd_info
from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError
from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError
from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call
@ -31,7 +30,7 @@ class EncryptedFileManager(object):
def __init__(self, session, sd_identifier):
self.auto_re_reflect = conf.settings['reflect_uploads']
self.auto_re_reflect = conf.settings['reflect_uploads'] and conf.settings['auto_re_reflect_interval'] > 0
self.auto_re_reflect_interval = conf.settings['auto_re_reflect_interval']
self.session = session
self.storage = session.storage
@ -96,25 +95,14 @@ class EncryptedFileManager(object):
suggested_file_name=suggested_file_name
)
@defer.inlineCallbacks
def _start_lbry_file(self, file_info, payment_rate_manager):
def _start_lbry_file(self, file_info, payment_rate_manager, claim_info):
lbry_file = self._get_lbry_file(
file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'],
file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'],
file_info['suggested_file_name']
)
yield lbry_file.get_claim_info()
try:
# verify the stream is valid (we might have downloaded an invalid stream
# in the past when the validation check didn't work)
stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True)
validate_descriptor(stream_info)
except InvalidStreamDescriptorError as err:
log.warning("Stream for descriptor %s is invalid (%s), cleaning it up",
lbry_file.sd_hash, err.message)
yield lbry_file.delete_data()
yield self.session.storage.delete_stream(lbry_file.stream_hash)
else:
if claim_info:
lbry_file.set_claim_info(claim_info)
try:
# restore will raise an Exception if status is unknown
lbry_file.restore(file_info['status'])
@ -128,19 +116,18 @@ class EncryptedFileManager(object):
@defer.inlineCallbacks
def _start_lbry_files(self):
files = yield self.session.storage.get_all_lbry_files()
claim_infos = yield self.session.storage.get_claims_from_stream_hashes([file['stream_hash'] for file in files])
b_prm = self.session.base_payment_rate_manager
payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker)
log.info("Starting %i files", len(files))
dl = []
for file_info in files:
dl.append(self._start_lbry_file(file_info, payment_rate_manager))
yield defer.DeferredList(dl)
claim_info = claim_infos.get(file_info['stream_hash'])
self._start_lbry_file(file_info, payment_rate_manager, claim_info)
log.info("Started %i lbry files", len(self.lbry_files))
if self.auto_re_reflect is True:
safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval)
safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval / 10)
@defer.inlineCallbacks
def _stop_lbry_file(self, lbry_file):
@ -253,7 +240,9 @@ class EncryptedFileManager(object):
def reflect_lbry_files(self):
sem = defer.DeferredSemaphore(self.CONCURRENT_REFLECTS)
ds = []
sd_hashes_to_reflect = yield self.storage.get_streams_to_re_reflect()
for lbry_file in self.lbry_files:
if lbry_file.sd_hash in sd_hashes_to_reflect:
ds.append(sem.run(reflect_file, lbry_file))
yield defer.DeferredList(ds)

View file

@ -1,10 +0,0 @@
"""
A client library for sending and receiving payments on the point trader network.
The point trader network is a simple payment system used solely for testing lbrynet-console. A user
creates a public key, registers it with the point trader server, and receives free points for
registering. The public key is used to spend points, and also used as an address to which points
are sent. To spend points, the public key signs a message containing the amount and the destination
public key and sends it to the point trader server. To check for payments, the recipient sends a
signed message asking the point trader server for its balance.
"""

View file

@ -1,230 +0,0 @@
from lbrynet import conf
from twisted.web.client import Agent, FileBodyProducer, Headers, ResponseDone
from twisted.internet import threads, defer, protocol
from Crypto.Hash import SHA
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_PSS
from StringIO import StringIO
import time
import json
import binascii
class BeginningPrinter(protocol.Protocol):
def __init__(self, finished):
self.finished = finished
self.data = ""
def dataReceived(self, bytes):
self.data = self.data + bytes
def connectionLost(self, reason):
if reason.check(ResponseDone) is not None:
self.finished.callback(str(self.data))
else:
self.finished.errback(reason)
def read_body(response):
d = defer.Deferred()
response.deliverBody(BeginningPrinter(d))
return d
def get_body(response):
if response.code != 200:
print "\n\n\n\nbad error code\n\n\n\n"
raise ValueError(response.phrase)
else:
return read_body(response)
def get_body_from_request(path, data):
from twisted.internet import reactor
jsondata = FileBodyProducer(StringIO(json.dumps(data)))
agent = Agent(reactor)
d = agent.request(
'POST', conf.settings['pointtrader_server'] + path,
Headers({'Content-Type': ['application/json']}), jsondata)
d.addCallback(get_body)
return d
def print_response(response):
pass
def print_error(err):
print err.getTraceback()
return err
def register_new_account(private_key):
data = {}
data['pub_key'] = private_key.publickey().exportKey()
def get_success_from_body(body):
r = json.loads(body)
if not 'success' in r or r['success'] is False:
return False
return True
d = get_body_from_request('/register/', data)
d.addCallback(get_success_from_body)
return d
def send_points(private_key, recipient_public_key, amount):
encoded_public_key = private_key.publickey().exportKey()
timestamp = time.time()
h = SHA.new()
h.update(encoded_public_key)
h.update(recipient_public_key)
h.update(str(amount))
h.update(str(timestamp))
signer = PKCS1_PSS.new(private_key)
signature = binascii.hexlify(signer.sign(h))
data = {}
data['sender_pub_key'] = encoded_public_key
data['recipient_pub_key'] = recipient_public_key
data['amount'] = amount
data['timestamp'] = timestamp
data['signature'] = signature
def get_success_from_body(body):
r = json.loads(body)
if not 'success' in r or r['success'] is False:
return False
return True
d = get_body_from_request('/send-points/', data)
d.addCallback(get_success_from_body)
return d
def get_recent_transactions(private_key):
encoded_public_key = private_key.publickey().exportKey()
timestamp = time.time()
h = SHA.new()
h.update(encoded_public_key)
h.update(str(timestamp))
signer = PKCS1_PSS.new(private_key)
signature = binascii.hexlify(signer.sign(h))
data = {}
data['pub_key'] = encoded_public_key
data['timestamp'] = timestamp
data['signature'] = signature
data['end_time'] = 0
data['start_time'] = 120
def get_transactions_from_body(body):
r = json.loads(body)
if "transactions" not in r:
raise ValueError("Invalid response: no 'transactions' field")
else:
return r['transactions']
d = get_body_from_request('/get-transactions/', data)
d.addCallback(get_transactions_from_body)
return d
def get_balance(private_key):
encoded_public_key = private_key.publickey().exportKey()
timestamp = time.time()
h = SHA.new()
h.update(encoded_public_key)
h.update(str(timestamp))
signer = PKCS1_PSS.new(private_key)
signature = binascii.hexlify(signer.sign(h))
data = {}
data['pub_key'] = encoded_public_key
data['timestamp'] = timestamp
data['signature'] = signature
def get_balance_from_body(body):
r = json.loads(body)
if not 'balance' in r:
raise ValueError("Invalid response: no 'balance' field")
else:
return float(r['balance'])
d = get_body_from_request('/get-balance/', data)
d.addCallback(get_balance_from_body)
return d
def run_full_test():
keys = []
def save_key(private_key):
keys.append(private_key)
return private_key
def check_balances_and_transactions(unused, bal1, bal2, num_transactions):
def assert_balance_is(actual, expected):
assert abs(actual - expected) < .05
print "correct balance. actual:", str(actual), "expected:", str(expected)
return True
def assert_transaction_length_is(transactions, expected_length):
assert len(transactions) == expected_length
print "correct transaction length"
return True
d1 = get_balance(keys[0])
d1.addCallback(assert_balance_is, bal1)
d2 = get_balance(keys[1])
d2.addCallback(assert_balance_is, bal2)
d3 = get_recent_transactions(keys[0])
d3.addCallback(assert_transaction_length_is, num_transactions)
d4 = get_recent_transactions(keys[1])
d4.addCallback(assert_transaction_length_is, num_transactions)
dl = defer.DeferredList([d1, d2, d3, d4])
return dl
def do_transfer(unused, amount):
d = send_points(keys[0], keys[1].publickey().exportKey(), amount)
return d
d1 = threads.deferToThread(RSA.generate, 4096)
d1.addCallback(save_key)
d1.addCallback(register_new_account)
d2 = threads.deferToThread(RSA.generate, 4096)
d2.addCallback(save_key)
d2.addCallback(register_new_account)
dlist = defer.DeferredList([d1, d2])
dlist.addCallback(check_balances_and_transactions, 1000, 1000, 0)
dlist.addCallback(do_transfer, 50)
dlist.addCallback(check_balances_and_transactions, 950, 1050, 1)
dlist.addCallback(do_transfer, 75)
dlist.addCallback(check_balances_and_transactions, 875, 1125, 2)
dlist.addErrback(print_error)
if __name__ == "__main__":
from twisted.internet import reactor
reactor.callLater(1, run_full_test)
reactor.callLater(25, reactor.stop)
reactor.run()

View file

@ -55,6 +55,16 @@ class EncryptedFileReflectorClient(Protocol):
d.addCallback(lambda _: self.send_next_request())
d.addErrback(self.response_failure_handler)
def store_result(self, result):
if not self.needed_blobs or len(self.reflected_blobs) == len(self.needed_blobs):
reflected = True
else:
reflected = False
d = self.blob_manager.storage.update_reflected_stream(self.sd_hash, self.transport.getPeer().host, reflected)
d.addCallback(lambda _: result)
return d
def connectionLost(self, reason):
# make sure blob file readers get closed
self.set_not_uploading()
@ -68,15 +78,17 @@ class EncryptedFileReflectorClient(Protocol):
else:
log.info('Finished sending reflector %i blobs for %s',
len(self.reflected_blobs), self.stream_descriptor)
self.factory.finished_deferred.callback(self.reflected_blobs)
result = self.reflected_blobs
elif reason.check(error.ConnectionLost):
log.warning("Stopped reflecting %s after sending %i blobs",
self.stream_descriptor, len(self.reflected_blobs))
self.factory.finished_deferred.callback(self.reflected_blobs)
result = self.reflected_blobs
else:
log.info('Reflector finished for %s: %s', self.stream_descriptor,
reason)
self.factory.finished_deferred.callback(reason)
result = reason
self.factory.finished_deferred.addCallback(self.store_result)
self.factory.finished_deferred.callback(result)
# IConsumer stuff

View file

@ -4,11 +4,9 @@ import os
import platform
import shutil
import sys
import random
import unittest
from Crypto import Random
from Crypto.Hash import MD5
from hashlib import md5
from lbrynet import conf
from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager
from lbrynet.core.Session import Session
@ -22,7 +20,6 @@ from twisted.internet import defer, threads, task
from twisted.trial.unittest import TestCase
from twisted.python.failure import Failure
from lbrynet.dht.node import Node
from lbrynet.core.PeerManager import PeerManager
from lbrynet.core.RateLimiter import DummyRateLimiter, RateLimiter
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
@ -99,9 +96,6 @@ class LbryUploader(object):
from twisted.internet import reactor
self.reactor = reactor
logging.debug("Starting the uploader")
Random.atfork()
r = random.Random()
r.seed("start_lbry_uploader")
wallet = FakeWallet()
peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager, 1)
@ -115,7 +109,7 @@ class LbryUploader(object):
node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer,
peer_port=5553, dht_node_port=4445, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1")
dht_node_class=FakeNode, is_generous=self.is_generous, external_ip="127.0.0.1")
self.lbry_file_manager = EncryptedFileManager(self.session, self.sd_identifier)
if self.ul_rate_limit is not None:
self.session.rate_limiter.set_ul_limit(self.ul_rate_limit)
@ -192,10 +186,6 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event,
logging.debug("Starting the uploader")
Random.atfork()
r = random.Random()
r.seed("start_lbry_reuploader")
wallet = FakeWallet()
peer_port = 5553 + n
@ -207,7 +197,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event,
db_dir, blob_dir = mk_db_and_blob_dir()
session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir,
node_id="abcd" + str(n), dht_node_port=4446,
node_id="abcd" + str(n), dht_node_port=4446, dht_node_class=FakeNode,
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=peer_port,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
@ -298,7 +288,6 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_genero
logging.debug("Starting the uploader")
Random.atfork()
wallet = FakeWallet()
peer_manager = PeerManager()
@ -315,7 +304,7 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_genero
db_dir, blob_dir = mk_db_and_blob_dir()
session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="efgh",
peer_finder=peer_finder, hash_announcer=hash_announcer,
peer_finder=peer_finder, hash_announcer=hash_announcer, dht_node_class=FakeNode,
blob_dir=blob_dir, peer_port=peer_port, dht_node_port=4446,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
@ -497,7 +486,7 @@ class TestTransfer(TestCase):
blob_dir=blob_dir, peer_port=5553, dht_node_port=4445,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1")
dht_node_class=FakeNode, is_generous=self.is_generous, external_ip="127.0.0.1")
self.lbry_file_manager = EncryptedFileManager(
self.session, sd_identifier)
@ -516,7 +505,7 @@ class TestTransfer(TestCase):
def check_md5_sum():
f = open(os.path.join(db_dir, 'test_file'))
hashsum = MD5.new()
hashsum = md5()
hashsum.update(f.read())
self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be")
@ -582,7 +571,7 @@ class TestTransfer(TestCase):
self.session = Session(
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553, dht_node_port=4445,
blob_dir=blob_dir, peer_port=5553, dht_node_port=4445, dht_node_class=FakeNode,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], external_ip="127.0.0.1")
@ -662,7 +651,7 @@ class TestTransfer(TestCase):
db_dir, blob_dir = mk_db_and_blob_dir()
self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir,
node_id="abcd", peer_finder=peer_finder, dht_node_port=4445,
node_id="abcd", peer_finder=peer_finder, dht_node_port=4445, dht_node_class=FakeNode,
hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
@ -689,7 +678,7 @@ class TestTransfer(TestCase):
def check_md5_sum():
f = open(os.path.join(db_dir, 'test_file'))
hashsum = MD5.new()
hashsum = md5()
hashsum.update(f.read())
self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be")
@ -769,7 +758,7 @@ class TestTransfer(TestCase):
sd_identifier = StreamDescriptorIdentifier()
db_dir, blob_dir = mk_db_and_blob_dir()
self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir,
self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, dht_node_class=FakeNode,
node_id="abcd", peer_finder=peer_finder, dht_node_port=4445,
hash_announcer=hash_announcer, blob_dir=blob_dir,
peer_port=5553, use_upnp=False, rate_limiter=rate_limiter,
@ -812,7 +801,7 @@ class TestTransfer(TestCase):
def check_md5_sum():
f = open('test_file')
hashsum = MD5.new()
hashsum = md5()
hashsum.update(f.read())
self.assertEqual(hashsum.hexdigest(), "e5941d615f53312fd66638239c1f90d5")

View file

@ -2,7 +2,7 @@ import os
import shutil
import tempfile
from Crypto.Hash import MD5
from hashlib import md5
from twisted.trial.unittest import TestCase
from twisted.internet import defer, threads
@ -127,7 +127,7 @@ class TestStreamify(TestCase):
self.assertTrue(lbry_file.sd_hash, sd_hash)
yield lbry_file.start()
f = open('test_file')
hashsum = MD5.new()
hashsum = md5()
hashsum.update(f.read())
self.assertEqual(hashsum.hexdigest(), "68959747edc73df45e45db6379dd7b3b")

View file

@ -1,7 +1,10 @@
import base64
import struct
import io
from Crypto.PublicKey import RSA
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from twisted.internet import defer, error
from twisted.python.failure import Failure
@ -9,11 +12,18 @@ from lbrynet.core.client.ClientRequest import ClientRequest
from lbrynet.core.Error import RequestCanceledError
from lbrynet.core import BlobAvailability
from lbrynet.core.utils import generate_id
from lbrynet.dht.node import Node as RealNode
from lbrynet.daemon import ExchangeRateManager as ERM
from lbrynet import conf
from util import debug_kademlia_packet
KB = 2**10
PUBLIC_EXPONENT = 65537 # http://www.daemonology.net/blog/2009-06-11-cryptographic-right-answers.html
def decode_rsa_key(pem_key):
decoded = base64.b64decode(''.join(pem_key.splitlines()[1:-1]))
return serialization.load_der_public_key(decoded, default_backend())
class FakeLBRYFile(object):
@ -24,15 +34,9 @@ class FakeLBRYFile(object):
self.file_name = 'fake_lbry_file'
class Node(object):
def __init__(self, peer_finder=None, peer_manager=None, dht_node_port=None, peer_port=3333, **kwargs):
self.peer_finder = peer_finder
self.peer_manager = peer_manager
self.peerPort = peer_port
self.udpPort = dht_node_port
def joinNetwork(self, *args):
return defer.succeed(True)
class Node(RealNode):
def joinNetwork(self, known_node_addresses=None):
return defer.succeed(None)
def stop(self):
return defer.succeed(None)
@ -142,9 +146,10 @@ class PointTraderKeyQueryHandler(object):
if self.query_identifiers[0] in queries:
new_encoded_pub_key = queries[self.query_identifiers[0]]
try:
RSA.importKey(new_encoded_pub_key)
decode_rsa_key(new_encoded_pub_key)
except (ValueError, TypeError, IndexError):
return defer.fail(Failure(ValueError("Client sent an invalid public key")))
value_error = ValueError("Client sent an invalid public key: {}".format(new_encoded_pub_key))
return defer.fail(Failure(value_error))
self.public_key = new_encoded_pub_key
self.wallet.set_public_key_for_peer(self.peer, self.public_key)
fields = {'public_key': self.wallet.encoded_public_key}
@ -157,8 +162,10 @@ class PointTraderKeyQueryHandler(object):
class Wallet(object):
def __init__(self):
self.private_key = RSA.generate(1024)
self.encoded_public_key = self.private_key.publickey().exportKey()
self.private_key = rsa.generate_private_key(public_exponent=PUBLIC_EXPONENT,
key_size=1024, backend=default_backend())
self.encoded_public_key = self.private_key.public_key().public_bytes(serialization.Encoding.PEM,
serialization.PublicFormat.PKCS1)
self._config = None
self.network = None
self.wallet = None
@ -392,6 +399,7 @@ create_stream_sd_file = {
def mock_conf_settings(obj, settings={}):
conf.initialize_settings(False)
original_settings = conf.settings
conf.settings = conf.Config(conf.FIXED_SETTINGS, conf.ADJUSTABLE_SETTINGS)
conf.settings.installation_id = conf.settings.get_installation_id()

View file

@ -116,7 +116,7 @@ class MocServerProtocolFactory(ServerFactory):
class TestIntegrationConnectionManager(unittest.TestCase):
def setUp(self):
conf.initialize_settings()
conf.initialize_settings(False)
self.TEST_PEER = Peer(LOCAL_HOST, PEER_PORT)
self.downloader = MocDownloader()

View file

@ -34,7 +34,7 @@ class DHTHashAnnouncerTest(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
from lbrynet.conf import initialize_settings
initialize_settings()
initialize_settings(False)
self.num_blobs = 10
self.blobs_to_announce = []
for i in range(0, self.num_blobs):

View file

@ -17,7 +17,7 @@ from lbrynet.core.cryptoutils import get_lbry_hash_obj
class BlobManagerTest(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
conf.initialize_settings()
conf.initialize_settings(False)
self.blob_dir = tempfile.mkdtemp()
self.db_dir = tempfile.mkdtemp()
self.bm = DiskBlobManager(self.blob_dir, SQLiteStorage(self.db_dir))

View file

@ -5,11 +5,13 @@ from lbrynet.blob.blob_file import MAX_BLOB_SIZE
from lbrynet.tests.mocks import mock_conf_settings
from Crypto import Random
from Crypto.Cipher import AES
from cryptography.hazmat.primitives.ciphers.algorithms import AES
import random
import string
import StringIO
import os
AES_BLOCK_SIZE_BYTES = AES.block_size / 8
class MocBlob(object):
def __init__(self):
@ -44,8 +46,8 @@ class TestCryptBlob(unittest.TestCase):
# max blob size is 2*2**20 -1 ( -1 due to required padding in the end )
blob = MocBlob()
blob_num = 0
key = Random.new().read(AES.block_size)
iv = Random.new().read(AES.block_size)
key = os.urandom(AES_BLOCK_SIZE_BYTES)
iv = os.urandom(AES_BLOCK_SIZE_BYTES)
maker = CryptBlob.CryptStreamBlobMaker(key, iv, blob_num, blob)
write_size = size_of_data
string_to_encrypt = random_string(size_of_data)
@ -54,7 +56,7 @@ class TestCryptBlob(unittest.TestCase):
done, num_bytes = maker.write(string_to_encrypt)
yield maker.close()
self.assertEqual(size_of_data, num_bytes)
expected_encrypted_blob_size = ((size_of_data / AES.block_size) + 1) * AES.block_size
expected_encrypted_blob_size = ((size_of_data / AES_BLOCK_SIZE_BYTES) + 1) * AES_BLOCK_SIZE_BYTES
self.assertEqual(expected_encrypted_blob_size, len(blob.data))
if size_of_data < MAX_BLOB_SIZE-1:

View file

@ -0,0 +1,43 @@
import unittest
from lbrynet.daemon.Daemon import sort_claim_results
class ClaimsComparatorTest(unittest.TestCase):
def test_sort_claim_results_when_sorted_by_claim_id(self):
results = [{"height": 1, "name": "res", "claim_id": "ccc", "nout": 0, "txid": "fdsafa"},
{"height": 1, "name": "res", "claim_id": "aaa", "nout": 0, "txid": "w5tv8uorgt"},
{"height": 1, "name": "res", "claim_id": "bbb", "nout": 0, "txid": "aecfaewcfa"}]
self.run_test(results, 'claim_id', ['aaa', 'bbb', 'ccc'])
def test_sort_claim_results_when_sorted_by_height(self):
results = [{"height": 1, "name": "res", "claim_id": "ccc", "nout": 0, "txid": "aecfaewcfa"},
{"height": 3, "name": "res", "claim_id": "ccc", "nout": 0, "txid": "aecfaewcfa"},
{"height": 2, "name": "res", "claim_id": "ccc", "nout": 0, "txid": "aecfaewcfa"}]
self.run_test(results, 'height', [1, 2, 3])
def test_sort_claim_results_when_sorted_by_name(self):
results = [{"height": 1, "name": "res1", "claim_id": "ccc", "nout": 0, "txid": "aecfaewcfa"},
{"height": 1, "name": "res3", "claim_id": "ccc", "nout": 0, "txid": "aecfaewcfa"},
{"height": 1, "name": "res2", "claim_id": "ccc", "nout": 0, "txid": "aecfaewcfa"}]
self.run_test(results, 'name', ['res1', 'res2', 'res3'])
def test_sort_claim_results_when_sorted_by_txid(self):
results = [{"height": 1, "name": "res1", "claim_id": "ccc", "nout": 2, "txid": "111"},
{"height": 1, "name": "res1", "claim_id": "ccc", "nout": 1, "txid": "222"},
{"height": 1, "name": "res1", "claim_id": "ccc", "nout": 3, "txid": "333"}]
self.run_test(results, 'txid', ['111', '222', '333'])
def test_sort_claim_results_when_sorted_by_nout(self):
results = [{"height": 1, "name": "res1", "claim_id": "ccc", "nout": 2, "txid": "aecfaewcfa"},
{"height": 1, "name": "res1", "claim_id": "ccc", "nout": 1, "txid": "aecfaewcfa"},
{"height": 1, "name": "res1", "claim_id": "ccc", "nout": 3, "txid": "aecfaewcfa"}]
self.run_test(results, 'nout', [1, 2, 3])
def run_test(self, results, field, expected):
actual = sort_claim_results(results)
self.assertEqual(expected, [r[field] for r in actual])
if __name__ == '__main__':
unittest.main()

View file

@ -85,7 +85,7 @@ class StorageTest(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
conf.initialize_settings()
conf.initialize_settings(False)
self.db_dir = tempfile.mkdtemp()
self.storage = SQLiteStorage(self.db_dir)
yield self.storage.setup()
@ -163,6 +163,25 @@ class BlobStorageTests(StorageTest):
self.assertEqual(blob_hashes, [])
class SupportsStorageTests(StorageTest):
@defer.inlineCallbacks
def test_supports_storage(self):
claim_ids = [random_lbry_hash() for _ in range(10)]
random_supports = [{"txid": random_lbry_hash(), "nout":i, "address": "addr{}".format(i), "amount": i}
for i in range(20)]
expected_supports = {}
for idx, claim_id in enumerate(claim_ids):
yield self.storage.save_supports(claim_id, random_supports[idx*2:idx*2+2])
for random_support in random_supports[idx*2:idx*2+2]:
random_support['claim_id'] = claim_id
expected_supports.setdefault(claim_id, []).append(random_support)
supports = yield self.storage.get_supports(claim_ids[0])
self.assertEqual(supports, expected_supports[claim_ids[0]])
all_supports = yield self.storage.get_supports(*claim_ids)
for support in all_supports:
self.assertIn(support, expected_supports[support['claim_id']])
class StreamStorageTests(StorageTest):
@defer.inlineCallbacks
def test_store_stream(self, stream_hash=None):

View file

@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
from Crypto.Cipher import AES
from cryptography.hazmat.primitives.ciphers.algorithms import AES
import mock
from twisted.trial import unittest
from twisted.internet import defer
@ -18,7 +18,7 @@ MB = 2**20
def iv_generator():
while True:
yield '3' * AES.block_size
yield '3' * (AES.block_size / 8)
class CreateEncryptedFileTest(unittest.TestCase):
@ -47,7 +47,7 @@ class CreateEncryptedFileTest(unittest.TestCase):
@defer.inlineCallbacks
def create_file(self, filename):
handle = mocks.GenFile(3*MB, '1')
key = '2'*AES.block_size
key = '2' * (AES.block_size / 8)
out = yield EncryptedFileCreator.create_lbry_file(self.session, self.file_manager, filename, handle,
key, iv_generator())
defer.returnValue(out)

View file

@ -1,6 +1,6 @@
import mock
from twisted.trial import unittest
from lbrynet import conf
from lbrynet.tests.mocks import mock_conf_settings
from lbrynet.daemon.auth import server
@ -10,6 +10,7 @@ class AuthJSONRPCServerTest(unittest.TestCase):
# and add useful general utilities like this
# onto it.
def setUp(self):
conf.initialize_settings(False)
self.server = server.AuthJSONRPCServer(use_authentication=False)
def test_get_server_port(self):

View file

@ -6,7 +6,7 @@ from twisted.internet import defer, error
from twisted.protocols.basic import LineOnlyReceiver
from errors import RemoteServiceException, ProtocolException, ServiceException
log = logging.getLogger()
log = logging.getLogger(__name__)
class StratumClientProtocol(LineOnlyReceiver):
@ -29,12 +29,22 @@ class StratumClientProtocol(LineOnlyReceiver):
try:
self.transport.setTcpNoDelay(True)
self.transport.setTcpKeepAlive(True)
if hasattr(socket, "TCP_KEEPIDLE"):
self.transport.socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE,
120) # Seconds before sending keepalive probes
else:
log.debug("TCP_KEEPIDLE not available")
if hasattr(socket, "TCP_KEEPINTVL"):
self.transport.socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL,
1) # Interval in seconds between keepalive probes
else:
log.debug("TCP_KEEPINTVL not available")
if hasattr(socket, "TCP_KEEPCNT"):
self.transport.socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT,
5) # Failed keepalive probles before declaring other end dead
else:
log.debug("TCP_KEEPCNT not available")
except Exception as err:
# Supported only by the socket transport,
# but there's really no better place in code to trigger this.

View file

@ -105,3 +105,6 @@ class StratumClient(ClientFactory):
def blockchain_address_get_history(self, address):
return self._rpc('blockchain.address.get_history', [address])
def blockchain_block_get_server_height(self):
return self._rpc('blockchain.block.get_server_height', [])

View file

@ -1,5 +1,6 @@
certifi==2018.4.16
Twisted==16.6.0
cryptography==2.0.3
cryptography==2.2.2
appdirs==1.4.3
argparse==1.2.1
docopt==0.6.2
@ -12,19 +13,18 @@ GitPython==2.1.3
jsonrpc==1.2
jsonrpclib==0.1.7
keyring==10.4.0
git+https://github.com/lbryio/lbryschema.git@v0.0.15#egg=lbryschema
git+https://github.com/lbryio/lbryum.git@v3.2.1#egg=lbryum
git+https://github.com/lbryio/lbryschema.git@v0.0.16rc2#egg=lbryschema
git+https://github.com/lbryio/lbryum.git@v3.2.2rc1#egg=lbryum
miniupnpc==1.9
pbkdf2==1.3
pycrypto==2.6.1
pyyaml==3.12
PyGithub==1.34
qrcode==5.2.2
requests==2.9.1
txrequests==0.9.5
service_identity==16.0.0
six>=1.9.0
slowaes==0.1a1
txJSON-RPC==0.5
wsgiref==0.1.2
zope.interface==4.3.3
treq==17.8.0

View file

@ -14,7 +14,7 @@ from lbrynet.core import log_support, Wallet, Peer
from lbrynet.core.SinglePeerDownloader import SinglePeerDownloader
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader
from lbrynet.core.BlobManager import DiskBlobManager
from lbrynet.dht.hashannouncer import DummyHashAnnouncer
from lbrynet.database.storage import SQLiteStorage
log = logging.getLogger()
@ -45,13 +45,13 @@ def main(args=None):
@defer.inlineCallbacks
def download_it(peer, timeout, blob_hash):
tmp_dir = yield threads.deferToThread(tempfile.mkdtemp)
announcer = DummyHashAnnouncer()
tmp_blob_manager = DiskBlobManager(announcer, tmp_dir, tmp_dir)
storage = SQLiteStorage(tmp_dir, reactor)
yield storage.setup()
tmp_blob_manager = DiskBlobManager(tmp_dir, storage)
config = {'auto_connect': True}
if conf.settings['lbryum_wallet_dir']:
config['lbryum_path'] = conf.settings['lbryum_wallet_dir']
storage = Wallet.InMemoryStorage()
wallet = Wallet.LBRYumWallet(storage, config)
downloader = SinglePeerDownloader()
@ -76,8 +76,9 @@ def download_it(peer, timeout, blob_hash):
pass
if info:
break
time.sleep(
0.1) # there's some kind of race condition where it sometimes doesnt write the blob to disk in time
# there's some kind of race condition where it sometimes doesnt write the blob to disk in time
time.sleep(0.1)
if info is not None:
pprint(info)

View file

@ -17,18 +17,18 @@ from setuptools import setup, find_packages
requires = [
'Twisted',
'appdirs',
'distro',
'base58',
'envparse',
'jsonrpc',
'lbryschema==0.0.15',
'lbryum==3.2.1',
'lbryschema==0.0.16rc2',
'lbryum==3.2.2rc1',
'miniupnpc',
'pycrypto',
'pyyaml',
'requests',
'txrequests',
'txJSON-RPC',
'zope.interface',
'treq',
'docopt'
]