Merge remote-tracking branch 'origin/master' into refactor-settings

Conflicts:
	lbrynet/conf.py
	lbrynet/lbrynet_daemon/Daemon.py
	lbrynet/lbrynet_daemon/DaemonCLI.py
	lbrynet/lbrynet_daemon/UIManager.py
	tests/functional/test_misc.py
This commit is contained in:
Job Evers-Meltzer 2016-10-27 10:18:56 -05:00
commit 0edacbe4c8
36 changed files with 1090 additions and 832 deletions

View file

@ -1,2 +1,6 @@
from constants import *
from events import *
from api import AnalyticsApi as Api
from api import Api
from track import Track
from manager import Manager

View file

@ -28,7 +28,7 @@ def log_response(fn):
return wrapper
class AnalyticsApi(object):
class Api(object):
def __init__(self, session, url, write_key):
self.session = session
self.url = url
@ -61,7 +61,7 @@ class AnalyticsApi(object):
@classmethod
def load(cls, session=None):
"""Initialize an instance using values from lbry.io."""
"""Initialize an instance using values from the configuration"""
if not session:
session = sessions.FuturesSession()
return cls(

View file

@ -0,0 +1,4 @@
"""Constants for metrics"""
BLOB_BYTES_UPLOADED = 'Blob Bytes Uploaded'
BLOB_BYTES_AVAILABLE = 'Blob Bytes Available'

View file

@ -1,6 +1,6 @@
import logging
from lbrynet.analytics import utils
from lbrynet.core import utils
log = logging.getLogger(__name__)
@ -23,30 +23,38 @@ class Events(object):
self.session_id = session_id
def heartbeat(self):
return {
'userId': 'lbry',
'event': 'Heartbeat',
'properties': {
'lbry_id': self.lbry_id,
'session_id': self.session_id
},
'context': self.context,
'timestamp': utils.now()
}
return self._event('Heartbeat')
def download_started(self, name, stream_info=None):
return {
'userId': 'lbry',
'event': 'Download Started',
'properties': {
'lbry_id': self.lbry_id,
'session_id': self.session_id,
properties = {
'name': name,
'stream_info': get_sd_hash(stream_info)
},
'context': self.context,
'timestamp': utils.now()
}
return self._event('Download Started', properties)
def metric_observed(self, metric_name, value):
properties = {
'value': value,
}
return self._event(metric_name, properties)
def _event(self, event, event_properties=None):
return {
'userId': 'lbry',
'event': event,
'properties': self._properties(event_properties),
'context': self.context,
'timestamp': utils.isonow()
}
def _properties(self, event_properties=None):
event_properties = event_properties or {}
properties = {
'lbry_id': self.lbry_id,
'session_id': self.session_id,
}
properties.update(event_properties)
return properties
def make_context(platform, wallet, is_dev=False):

View file

@ -0,0 +1,67 @@
from lbrynet.core import looping_call_manager
from twisted.internet import defer
from twisted.internet import task
import constants
class Manager(object):
def __init__(self, analytics_api, events_generator, track):
self.analytics_api = analytics_api
self.events_generator = events_generator
self.track = track
self.looping_call_manager = self.setup_looping_calls()
def setup_looping_calls(self):
call_manager = looping_call_manager.LoopingCallManager()
looping_calls = [
('send_heartbeat', self._send_heartbeat),
('update_tracked_metrics', self._update_tracked_metrics),
]
for name, fn in looping_calls:
call_manager.register_looping_call(name, task.LoopingCall(fn))
return call_manager
def start(self):
self.looping_call_manager.start('send_heartbeat', 60)
self.looping_call_manager.start('update_tracked_metrics', 300)
def shutdown(self):
self.looping_call_manager.shutdown()
def send_download_started(self, name, stream_info=None):
event = self.events_generator.download_started(name, stream_info)
self.analytics_api.track(event)
def register_repeating_metric(self, event_name, value_generator, frequency=300):
lcall = task.LoopingCall(self._send_repeating_metric, event_name, value_generator)
self.looping_call_manager.register_looping_call(event_name, lcall)
lcall.start(frequency)
def _send_heartbeat(self):
heartbeat = self.events_generator.heartbeat()
self.analytics_api.track(heartbeat)
def _update_tracked_metrics(self):
should_send, value = self.track.summarize_and_reset(constants.BLOB_BYTES_UPLOADED)
if should_send:
event = self.events_generator.metric_observed(constants.BLOB_BYTES_UPLOADED, value)
self.analytics_api.track(event)
def _send_repeating_metric(self, event_name, value_generator):
result = value_generator()
if_deferred(result, self._send_repeating_metric_value, event_name)
def _send_repeating_metric_value(self, result, event_name):
should_send, value = result
if should_send:
event = self.events_generator.metric_observed(event_name, value)
self.analytics_api.track(event)
def if_deferred(maybe_deferred, callback, *args, **kwargs):
if isinstance(maybe_deferred, defer.Deferred):
maybe_deferred.addCallback(callback, *args, **kwargs)
else:
callback(maybe_deferred, *args, **kwargs)

View file

@ -0,0 +1,24 @@
import collections
class Track(object):
"""Track and summarize observations of metrics."""
def __init__(self):
self.data = collections.defaultdict(list)
def add_observation(self, metric, value):
self.data[metric].append(value)
def summarize_and_reset(self, metric, op=sum):
"""Apply `op` on the current values for `metric`.
This operation also resets the metric.
Returns:
a tuple (should_send, value)
"""
try:
values = self.data.pop(metric)
return True, op(values)
except KeyError:
return False, None

View file

@ -1,8 +0,0 @@
import datetime
from lbrynet.core.utils import *
def now():
"""Return utc now in isoformat with timezone"""
return datetime.datetime.utcnow().isoformat() + 'Z'

View file

@ -112,19 +112,30 @@ def get_loggly_url(token=None, version=None):
@_log_decorator
def configure_loggly_handler(url=None, **kwargs):
url = url or get_loggly_url()
json_format = {
"loggerName": "%(name)s",
"asciTime": "%(asctime)s",
"fileName": "%(filename)s",
"functionName": "%(funcName)s",
"levelNo": "%(levelno)s",
"lineNo": "%(lineno)d",
"levelName": "%(levelname)s",
"message": "%(message)s",
}
json_format.update(kwargs)
formatter = logging.Formatter(json.dumps(json_format))
formatter = JsonFormatter(**kwargs)
handler = HTTPSHandler(url)
handler.setFormatter(formatter)
handler.name = 'loggly'
return handler
class JsonFormatter(logging.Formatter):
"""Format log records using json serialization"""
def __init__(self, **kwargs):
self.attributes = kwargs
def format(self, record):
data = {
'loggerName': record.name,
'asciTime': self.formatTime(record),
'fileName': record.filename,
'functionName': record.funcName,
'levelNo': record.levelno,
'lineNo': record.lineno,
'levelName': record.levelname,
'message': record.getMessage(),
}
data.update(self.attributes)
if record.exc_info:
data['exc_info'] = self.formatException(record.exc_info)
return json.dumps(data)

View file

@ -0,0 +1,20 @@
class LoopingCallManager(object):
def __init__(self, calls=None):
self.calls = calls or {}
def register_looping_call(self, name, call):
assert name not in self.calls, '{} is already registered'.format(name)
self.calls[name] = call
def start(self, name, *args):
lcall = self.calls[name]
if not lcall.running:
lcall.start(*args)
def stop(self, name):
self.calls[name].stop()
def shutdown(self):
for lcall in self.calls.itervalues():
if lcall.running:
lcall.stop()

View file

@ -5,7 +5,9 @@ from twisted.protocols.basic import FileSender
from twisted.python.failure import Failure
from zope.interface import implements
from lbrynet.core.Offer import Offer
from lbrynet import analytics
from lbrynet.interfaces import IQueryHandlerFactory, IQueryHandler, IBlobSender
@ -15,15 +17,16 @@ log = logging.getLogger(__name__)
class BlobRequestHandlerFactory(object):
implements(IQueryHandlerFactory)
def __init__(self, blob_manager, wallet, payment_rate_manager):
def __init__(self, blob_manager, wallet, payment_rate_manager, track):
self.blob_manager = blob_manager
self.wallet = wallet
self.payment_rate_manager = payment_rate_manager
self.track = track
######### IQueryHandlerFactory #########
def build_query_handler(self):
q_h = BlobRequestHandler(self.blob_manager, self.wallet, self.payment_rate_manager)
q_h = BlobRequestHandler(self.blob_manager, self.wallet, self.payment_rate_manager, self.track)
return q_h
def get_primary_query_identifier(self):
@ -39,11 +42,12 @@ class BlobRequestHandler(object):
BLOB_QUERY = 'requested_blob'
AVAILABILITY_QUERY = 'requested_blobs'
def __init__(self, blob_manager, wallet, payment_rate_manager):
def __init__(self, blob_manager, wallet, payment_rate_manager, track):
self.blob_manager = blob_manager
self.payment_rate_manager = payment_rate_manager
self.wallet = wallet
self.query_identifiers = [self.PAYMENT_RATE_QUERY, self.BLOB_QUERY, self.AVAILABILITY_QUERY]
self.track = track
self.peer = None
self.blob_data_payment_rate = None
self.read_handle = None
@ -190,8 +194,10 @@ class BlobRequestHandler(object):
return inner_d
def count_bytes(data):
self.blob_bytes_uploaded += len(data)
self.peer.update_stats('blob_bytes_uploaded', len(data))
uploaded = len(data)
self.blob_bytes_uploaded += uploaded
self.peer.update_stats('blob_bytes_uploaded', uploaded)
self.track.add_observation(analytics.BLOB_BYTES_UPLOADED, uploaded)
return data
def start_transfer():

View file

@ -9,9 +9,10 @@ log = logging.getLogger(__name__)
class ServerRequestHandler(object):
"""This class handles requests from clients. It can upload blobs and return request for information about
more blobs that are associated with streams"""
"""This class handles requests from clients. It can upload blobs and
return request for information about more blobs that are
associated with streams.
"""
implements(interfaces.IPushProducer, interfaces.IConsumer, IRequestHandler)
def __init__(self, consumer):
@ -90,20 +91,27 @@ class ServerRequestHandler(object):
log.debug("Received data")
log.debug("%s", str(data))
if self.request_received is False:
self.request_buff = self.request_buff + data
msg = self.try_to_parse_request(self.request_buff)
if msg is not None:
self.request_buff = ''
d = self.handle_request(msg)
if self.blob_sender is not None:
d.addCallback(lambda _: self.blob_sender.send_blob_if_requested(self))
d.addCallbacks(lambda _: self.finished_response(), self.request_failure_handler)
else:
log.debug("Request buff not a valid json message")
log.debug("Request buff: %s", str(self.request_buff))
return self._parse_data_and_maybe_send_blob(data)
else:
log.warning("The client sent data when we were uploading a file. This should not happen")
def _parse_data_and_maybe_send_blob(self, data):
self.request_buff = self.request_buff + data
msg = self.try_to_parse_request(self.request_buff)
if msg:
self.request_buff = ''
self._process_msg(msg)
else:
log.debug("Request buff not a valid json message")
log.debug("Request buff: %s", self.request_buff)
def _process_msg(self, msg):
d = self.handle_request(msg)
if self.blob_sender:
d.addCallback(lambda _: self.blob_sender.send_blob_if_requested(self))
d.addCallbacks(lambda _: self.finished_response(), self.request_failure_handler)
######### IRequestHandler #########
def register_query_handler(self, query_handler, query_identifiers):

View file

@ -13,12 +13,30 @@ from lbrynet.conf import AdjustableSettings
from lbrynet.core.cryptoutils import get_lbry_hash_obj
blobhash_length = get_lbry_hash_obj().digest_size * 2 # digest_size is in bytes, and blob hashes are hex encoded
# digest_size is in bytes, and blob hashes are hex encoded
blobhash_length = get_lbry_hash_obj().digest_size * 2
log = logging.getLogger(__name__)
# defining these time functions here allows for easier overriding in testing
def now():
return datetime.datetime.now()
def utcnow():
return datetime.datetime.utcnow()
def isonow():
"""Return utc now in isoformat with timezone"""
return utcnow().isoformat() + 'Z'
def today():
return datetime.datetime.today()
def generate_id(num=None):
h = get_lbry_hash_obj()
if num is not None:
@ -28,18 +46,19 @@ def generate_id(num=None):
return h.digest()
def is_valid_hashcharacter(char):
return char in "0123456789abcdef"
def is_valid_blobhash(blobhash):
"""
"""Checks whether the blobhash is the correct length and contains only
valid characters (0-9, a-f)
@param blobhash: string, the blobhash to check
@return: Whether the blobhash is the correct length and contains only valid characters (0-9, a-f)
@return: True/False
"""
if len(blobhash) != blobhash_length:
return False
for l in blobhash:
if l not in "0123456789abcdef":
return False
return True
return len(blobhash) == blobhash_length and all(is_valid_hashcharacter(l) for l in blobhash)
def version_is_greater_than(a, b):
@ -89,10 +108,6 @@ def save_settings(path):
settings_file.write(encoder(to_save))
def today():
return datetime.datetime.today()
def check_connection(server="www.lbry.io", port=80):
"""Attempts to open a socket to server:port and returns True if successful."""
try:

View file

@ -1,3 +1,5 @@
# TODO: THERE IS A LOT OF CODE IN THIS MODULE THAT SHOULD BE REMOVED
# AS IT IS REPEATED IN THE LBRYDaemon MODULE
import logging
import os.path
import argparse
@ -12,6 +14,7 @@ from yapsy.PluginManager import PluginManager
from twisted.internet import defer, threads, stdio, task, error
from lbrynet.lbrynet_daemon.auth.client import LBRYAPIClient
from lbrynet import analytics
from lbrynet.core.Session import Session
from lbrynet.lbrynet_console.ConsoleControl import ConsoleControl
from lbrynet.lbrynet_console.Settings import Settings
@ -366,11 +369,13 @@ class Console():
]
def get_blob_request_handler_factory(rate):
self.blob_request_payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager,
rate)
handlers.append(BlobRequestHandlerFactory(self.session.blob_manager,
self.session.wallet,
self.blob_request_payment_rate_manager))
self.blob_request_payment_rate_manager = PaymentRateManager(
self.session.base_payment_rate_manager, rate
)
handlers.append(BlobRequestHandlerFactory(
self.session.blob_manager, self.session.wallet,
self.blob_request_payment_rate_manager, analytics.Track()
))
d1 = self.settings.get_server_data_payment_rate()
d1.addCallback(get_blob_request_handler_factory)

View file

@ -21,6 +21,8 @@ from twisted.internet.task import LoopingCall
from txjsonrpc import jsonrpclib
from jsonschema import ValidationError
from lbrynet import __version__ as lbrynet_version
# TODO: importing this when internet is disabled raises a socket.gaierror
from lbryum.version import LBRYUM_VERSION as lbryum_version
from lbrynet import __version__ as lbrynet_version
@ -34,6 +36,7 @@ from lbrynet.core import utils
from lbrynet.core.utils import generate_id
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob, BlobStreamDescriptorReader
from lbrynet.core.Session import Session
from lbrynet.core.looping_call_manager import LoopingCallManager
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
from lbrynet.core.Error import UnknownNameError, InsufficientFundsError, InvalidNameError
@ -116,16 +119,103 @@ BAD_REQUEST = 400
NOT_FOUND = 404
OK_CODE = 200
class Checker:
"""The looping calls the daemon runs"""
INTERNET_CONNECTION = 'internet_connection_checker'
VERSION = 'version_checker'
CONNECTION_PROBLEM = 'connection_problem_checker'
PENDING_CLAIM = 'pending_claim_checker'
class FileID:
"""The different ways a file can be identified"""
NAME = 'name'
SD_HASH = 'sd_hash'
FILE_NAME = 'file_name'
# TODO add login credentials in a conf file
# TODO alert if your copy of a lbry file is out of date with the name record
REMOTE_SERVER = "www.lbry.io"
class NoValidSearch(Exception):
pass
class Parameters(object):
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
class CheckInternetConnection(object):
def __init__(self, daemon):
self.daemon = daemon
def __call__(self):
self.daemon.connected_to_internet = utils.check_connection()
class CheckRemoteVersions(object):
def __init__(self, daemon):
self.daemon = daemon
def __call__(self):
d = self._get_lbrynet_version()
d.addCallback(lambda _: self._get_lbryum_version())
def _get_lbryum_version(self):
try:
version = get_lbryum_version_from_github()
log.info(
"remote lbryum %s > local lbryum %s = %s",
version, lbryum_version,
utils.version_is_greater_than(version, lbryum_version)
)
self.daemon.git_lbryum_version = version
return defer.succeed(None)
except Exception:
log.info("Failed to get lbryum version from git")
self.daemon.git_lbryum_version = None
return defer.fail(None)
def _get_lbrynet_version(self):
try:
version = get_lbrynet_version_from_github()
log.info(
"remote lbrynet %s > local lbrynet %s = %s",
version, lbrynet_version,
utils.version_is_greater_than(version, lbrynet_version)
)
self.daemon.git_lbrynet_version = version
return defer.succeed(None)
except Exception:
log.info("Failed to get lbrynet version from git")
self.daemon.git_lbrynet_version = None
return defer.fail(None)
class AlwaysSend(object):
def __init__(self, value_generator, *args, **kwargs):
self.value_generator = value_generator
self.args = args
self.kwargs = kwargs
def __call__(self):
d = defer.maybeDeferred(self.value_generator, *self.args, **self.kwargs)
d.addCallback(lambda v: (True, v))
return d
def calculate_available_blob_size(blob_manager):
d = blob_manager.get_all_verified_blobs()
d.addCallback(
lambda blobs: defer.DeferredList([blob_manager.get_blob_length(b) for b in blobs]))
d.addCallback(lambda blob_lengths: sum(val for success, val in blob_lengths if success))
return d
class Daemon(AuthJSONRPCServer):
"""
LBRYnet daemon, a jsonrpc interface to lbry functions
@ -212,13 +302,15 @@ class Daemon(AuthJSONRPCServer):
self.pending_claims = {}
self.name_cache = {}
self.set_wallet_attributes()
self.internet_connection_checker = LoopingCall(self._check_network_connection)
self.version_checker = LoopingCall(self._check_remote_versions)
self.connection_problem_checker = LoopingCall(self._check_connection_problems)
self.pending_claim_checker = LoopingCall(self._check_pending_claims)
self.send_heartbeat = LoopingCall(self._send_heartbeat)
self.exchange_rate_manager = ExchangeRateManager()
self.lighthouse_client = LighthouseClient()
calls = {
Checker.INTERNET_CONNECTION: LoopingCall(CheckInternetConnection(self)),
Checker.VERSION: LoopingCall(CheckRemoteVersions(self)),
Checker.CONNECTION_PROBLEM: LoopingCall(self._check_connection_problems),
Checker.PENDING_CLAIM: LoopingCall(self._check_pending_claims),
}
self.looping_call_manager = LoopingCallManager(calls)
self.sd_identifier = StreamDescriptorIdentifier()
self.stream_info_manager = TempEncryptedFileMetadataManager()
self.settings = Settings(self.db_dir)
@ -297,9 +389,9 @@ class Daemon(AuthJSONRPCServer):
log.info("Starting lbrynet-daemon")
self.internet_connection_checker.start(3600)
self.version_checker.start(3600 * 12)
self.connection_problem_checker.start(1)
self.looping_call_manager.start(Checker.INTERNET_CONNECTION, 3600)
self.looping_call_manager.start(Checker.VERSION, 3600 * 12)
self.looping_call_manager.start(Checker.CONNECTION_PROBLEM, 1)
self.exchange_rate_manager.start()
d = defer.Deferred()
@ -314,6 +406,7 @@ class Daemon(AuthJSONRPCServer):
d.addCallback(lambda _: self._load_caches())
d.addCallback(lambda _: self._set_events())
d.addCallback(lambda _: self._get_session())
d.addCallback(lambda _: self._get_analytics())
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier))
d.addCallback(lambda _: self._setup_stream_identifier())
d.addCallback(lambda _: self._setup_lbry_file_manager())
@ -321,23 +414,11 @@ class Daemon(AuthJSONRPCServer):
d.addCallback(lambda _: self._setup_server())
d.addCallback(lambda _: _log_starting_vals())
d.addCallback(lambda _: _announce_startup())
d.addCallback(lambda _: self._load_analytics_api())
# TODO: handle errors here
d.callback(None)
return defer.succeed(None)
def _load_analytics_api(self):
self.analytics_api = analytics.Api.load()
self.send_heartbeat.start(60)
def _send_heartbeat(self):
heartbeat = self._events.heartbeat()
self.analytics_api.track(heartbeat)
def _send_download_started(self, name, stream_info=None):
event = self._events.download_started(name, stream_info)
self.analytics_api.track(event)
def _get_platform(self):
r = {
@ -390,43 +471,6 @@ class Daemon(AuthJSONRPCServer):
d = download_sd_blob(self.session, wonderfullife_sh, self.session.base_payment_rate_manager)
d.addCallbacks(lambda _: _log_success, lambda _: _log_failure)
def _check_remote_versions(self):
def _get_lbryum_version():
try:
r = urlopen("https://raw.githubusercontent.com/lbryio/lbryum/master/lib/version.py").read().split('\n')
version = next(line.split("=")[1].split("#")[0].replace(" ", "")
for line in r if "LBRYUM_VERSION" in line)
version = version.replace("'", "")
log.info(
"remote lbryum %s > local lbryum %s = %s",
version, lbryum_version,
utils.version_is_greater_than(version, lbryum_version)
)
self.git_lbryum_version = version
return defer.succeed(None)
except Exception:
log.info("Failed to get lbryum version from git")
self.git_lbryum_version = None
return defer.fail(None)
def _get_lbrynet_version():
try:
version = get_lbrynet_version_from_github()
log.info(
"remote lbrynet %s > local lbrynet %s = %s",
version, lbrynet_version,
utils.version_is_greater_than(version, lbrynet_version)
)
self.git_lbrynet_version = version
return defer.succeed(None)
except Exception:
log.info("Failed to get lbrynet version from git")
self.git_lbrynet_version = None
return defer.fail(None)
d = _get_lbrynet_version()
d.addCallback(lambda _: _get_lbryum_version())
def _check_connection_problems(self):
if not self.git_lbrynet_version or not self.git_lbryum_version:
self.connection_problem = CONNECTION_PROBLEM_CODES[0]
@ -453,7 +497,7 @@ class Daemon(AuthJSONRPCServer):
def _get_and_start_file(name):
d = defer.succeed(self.pending_claims.pop(name))
d.addCallback(lambda _: self._get_lbry_file("name", name, return_json=False))
d.addCallback(lambda _: self._get_lbry_file(FileID.NAME, name, return_json=False))
d.addCallback(lambda l: _start_file(l) if l.stopped else "LBRY file was already running")
def re_add_to_pending_claims(name):
@ -547,8 +591,12 @@ class Daemon(AuthJSONRPCServer):
def _setup_query_handlers(self):
handlers = [
BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet,
self.session.payment_rate_manager),
BlobRequestHandlerFactory(
self.session.blob_manager,
self.session.wallet,
self.session.payment_rate_manager,
self.analytics_manager.track
),
self.session.wallet.get_wallet_info_query_handler_factory(),
]
@ -612,18 +660,10 @@ class Daemon(AuthJSONRPCServer):
def _shutdown(self):
log.info("Closing lbrynet session")
log.info("Status at time of shutdown: " + self.startup_status[0])
if self.internet_connection_checker.running:
self.internet_connection_checker.stop()
if self.version_checker.running:
self.version_checker.stop()
if self.connection_problem_checker.running:
self.connection_problem_checker.stop()
self.looping_call_manager.shutdown()
self.analytics_manager.shutdown()
if self.lbry_ui_manager.update_checker.running:
self.lbry_ui_manager.update_checker.stop()
if self.pending_claim_checker.running:
self.pending_claim_checker.stop()
if self.send_heartbeat.running:
self.send_heartbeat.stop()
self._clean_up_temp_files()
@ -741,7 +781,6 @@ class Daemon(AuthJSONRPCServer):
session_id=self._session_id
)
def _setup_lbry_file_manager(self):
self.startup_status = STARTUP_STAGES[3]
self.lbry_file_metadata_manager = DBEncryptedFileMetadataManager(self.db_dir)
@ -758,6 +797,20 @@ class Daemon(AuthJSONRPCServer):
return d
def _get_analytics(self):
analytics_api = analytics.Api.load()
context = analytics.make_context(self._get_platform(), self.wallet_type)
events_generator = analytics.Events(
context, base58.b58encode(self.lbryid), self._session_id)
self.analytics_manager = analytics.Manager(
analytics_api, events_generator, analytics.Track())
self.analytics_manager.start()
self.analytics_manager.register_repeating_metric(
analytics.BLOB_BYTES_AVAILABLE,
AlwaysSend(calculate_available_blob_size, self.session.blob_manager),
frequency=300
)
def _get_session(self):
def get_default_data_rate():
d = self.settings.get_default_data_payment_rate()
@ -846,7 +899,7 @@ class Daemon(AuthJSONRPCServer):
Add a lbry file to the file manager, start the download, and return the new lbry file.
If it already exists in the file manager, return the existing lbry file
"""
self._send_download_started(name)
self.analytics_manager.send_download_started(name, stream_info)
helper = _DownloadNameHelper(
self, name, timeout, download_directory, file_name, wait_for_write)
@ -976,108 +1029,13 @@ class Daemon(AuthJSONRPCServer):
return defer.succeed(None)
def _get_lbry_file(self, search_by, val, return_json=True):
def _log_get_lbry_file(f):
if f and val:
log.info("Found LBRY file for " + search_by + ": " + val)
elif val:
log.info("Did not find LBRY file for " + search_by + ": " + val)
return f
def _get_json_for_return(f):
def _get_file_status(file_status):
message = STREAM_STAGES[2][1] % (file_status.name, file_status.num_completed, file_status.num_known, file_status.running_status)
return defer.succeed(message)
def _generate_reply(size):
if f.key:
key = binascii.b2a_hex(f.key)
else:
key = None
if os.path.isfile(os.path.join(self.download_directory, f.file_name)):
written_file = file(os.path.join(self.download_directory, f.file_name))
written_file.seek(0, os.SEEK_END)
written_bytes = written_file.tell()
written_file.close()
else:
written_bytes = False
if search_by == "name":
if val in self.streams.keys():
status = self.streams[val].code
elif f in self.lbry_file_manager.lbry_files:
# if f.stopped:
# status = STREAM_STAGES[3]
# else:
status = STREAM_STAGES[2]
else:
status = [False, False]
else:
status = [False, False]
if status[0] == DOWNLOAD_RUNNING_CODE:
d = f.status()
d.addCallback(_get_file_status)
d.addCallback(lambda message: {'completed': f.completed, 'file_name': f.file_name,
'download_directory': f.download_directory,
'download_path': os.path.join(f.download_directory, f.file_name),
'mime_type': mimetypes.guess_type(os.path.join(f.download_directory, f.file_name))[0],
'key': key,
'points_paid': f.points_paid, 'stopped': f.stopped,
'stream_hash': f.stream_hash,
'stream_name': f.stream_name,
'suggested_file_name': f.suggested_file_name,
'upload_allowed': f.upload_allowed, 'sd_hash': f.sd_hash,
'lbry_uri': f.uri, 'txid': f.txid, 'claim_id': f.claim_id,
'total_bytes': size,
'written_bytes': written_bytes, 'code': status[0],
'message': message})
else:
d = defer.succeed({'completed': f.completed, 'file_name': f.file_name, 'key': key,
'download_directory': f.download_directory,
'download_path': os.path.join(f.download_directory, f.file_name),
'mime_type': mimetypes.guess_type(os.path.join(f.download_directory, f.file_name))[0],
'points_paid': f.points_paid, 'stopped': f.stopped, 'stream_hash': f.stream_hash,
'stream_name': f.stream_name, 'suggested_file_name': f.suggested_file_name,
'upload_allowed': f.upload_allowed, 'sd_hash': f.sd_hash, 'total_bytes': size,
'written_bytes': written_bytes, 'lbry_uri': f.uri, 'txid': f.txid, 'claim_id': f.claim_id,
'code': status[0], 'message': status[1]})
return d
def _add_metadata(message):
def _add_to_dict(metadata):
message['metadata'] = metadata
return defer.succeed(message)
if f.txid:
d = self._resolve_name(f.uri)
d.addCallbacks(_add_to_dict, lambda _: _add_to_dict("Pending confirmation"))
else:
d = defer.succeed(message)
return d
if f:
d = f.get_total_bytes()
d.addCallback(_generate_reply)
d.addCallback(_add_metadata)
return d
else:
return False
if search_by == "name":
d = self._get_lbry_file_by_uri(val)
elif search_by == "sd_hash":
d = self._get_lbry_file_by_sd_hash(val)
elif search_by == "file_name":
d = self._get_lbry_file_by_file_name(val)
# d.addCallback(_log_get_lbry_file)
if return_json:
d.addCallback(_get_json_for_return)
return d
return _GetFileHelper(self, search_by, val, return_json).retrieve_file()
def _get_lbry_files(self):
d = defer.DeferredList([self._get_lbry_file('sd_hash', l.sd_hash) for l in self.lbry_file_manager.lbry_files])
d = defer.DeferredList([
self._get_lbry_file(FileID.SD_HASH, l.sd_hash)
for l in self.lbry_file_manager.lbry_files
])
return d
def _reflect(self, lbry_file):
@ -1424,8 +1382,7 @@ class Daemon(AuthJSONRPCServer):
return d
def jsonrpc_get_lbry_file(self, p):
"""
Get lbry file
"""Get lbry file
Args:
'name': get file by lbry uri,
@ -1443,15 +1400,18 @@ class Daemon(AuthJSONRPCServer):
'upload_allowed': bool
'sd_hash': string
"""
if p.keys()[0] in ['name', 'sd_hash', 'file_name']:
search_type = p.keys()[0]
d = self._get_lbry_file(search_type, p[search_type])
else:
d = defer.fail()
d = self._get_deferred_for_lbry_file(p)
d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d
def _get_deferred_for_lbry_file(self, p):
try:
searchtype, value = get_lbry_file_search_value(p)
except NoValidSearch:
return defer.fail()
else:
return self._get_lbry_file(searchtype, value)
def jsonrpc_resolve_name(self, p):
"""
Resolve stream info from a LBRY uri
@ -1464,9 +1424,8 @@ class Daemon(AuthJSONRPCServer):
force = p.get('force', False)
if 'name' in p:
name = p['name']
else:
name = p.get(FileID.NAME)
if not name:
return self._render_response(None, BAD_REQUEST)
d = self._resolve_name(name, force_refresh=force)
@ -1484,7 +1443,7 @@ class Daemon(AuthJSONRPCServer):
claim info, False if no such claim exists
"""
name = p['name']
name = p[FileID.NAME]
d = self.session.wallet.get_my_claim(name)
d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d
@ -1506,7 +1465,7 @@ class Daemon(AuthJSONRPCServer):
r['amount'] = float(r['amount']) / 10**8
return r
name = p['name']
name = p[FileID.NAME]
txid = p.get('txid', None)
d = self.session.wallet.get_claim_info(name, txid)
d.addCallback(_convert_amount_to_float)
@ -1519,11 +1478,11 @@ class Daemon(AuthJSONRPCServer):
# can spec what parameters it expects and how to set default values
timeout = p.get('timeout', self.download_timeout)
download_directory = p.get('download_directory', self.download_directory)
file_name = p.get('file_name')
file_name = p.get(FileID.FILE_NAME)
stream_info = p.get('stream_info')
sd_hash = get_sd_hash(stream_info)
wait_for_write = p.get('wait_for_write', True)
name = p.get('name')
name = p.get(FileID.NAME)
return Parameters(
timeout=timeout,
download_directory=download_directory,
@ -1579,14 +1538,20 @@ class Daemon(AuthJSONRPCServer):
"""
def _stop_file(f):
if f.stopped:
return "LBRY file wasn't running"
else:
d = self.lbry_file_manager.toggle_lbry_file_running(f)
d.addCallback(lambda _: "Stopped LBRY file")
return d
if p.keys()[0] in ['name', 'sd_hash', 'file_name']:
search_type = p.keys()[0]
d = self._get_lbry_file(search_type, p[search_type], return_json=False)
d.addCallback(lambda l: _stop_file(l) if not l.stopped else "LBRY file wasn't running")
try:
searchtype, value = get_lbry_file_search_value(p)
except NoValidSearch:
d = defer.fail()
else:
d = self._get_lbry_file(searchtype, value, return_json=False)
d.addCallback(_stop_file)
d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d
@ -1605,13 +1570,19 @@ class Daemon(AuthJSONRPCServer):
"""
def _start_file(f):
if f.stopped:
d = self.lbry_file_manager.toggle_lbry_file_running(f)
return defer.succeed("Started LBRY file")
else:
return "LBRY file was already running"
if p.keys()[0] in ['name', 'sd_hash', 'file_name']:
search_type = p.keys()[0]
d = self._get_lbry_file(search_type, p[search_type], return_json=False)
d.addCallback(lambda l: _start_file(l) if l.stopped else "LBRY file was already running")
try:
searchtype, value = get_lbry_file_search_value(p)
except NoValidSearch:
d = defer.fail()
else:
d = self._get_lbry_file(searchtype, value, return_json=False)
d.addCallback(_start_file)
d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d
@ -1626,7 +1597,7 @@ class Daemon(AuthJSONRPCServer):
estimated cost
"""
name = p['name']
name = p[FileID.NAME]
d = self._get_est_cost(name)
d.addCallback(lambda r: self._render_response(r, OK_CODE))
@ -1679,21 +1650,23 @@ class Daemon(AuthJSONRPCServer):
confirmation message
"""
if 'delete_target_file' in p.keys():
delete_file = p['delete_target_file']
else:
delete_file = True
delete_file = p.get('delete_target_file', True)
def _delete_file(f):
if not f:
return False
file_name = f.file_name
d = self._delete_lbry_file(f, delete_file=delete_file)
d.addCallback(lambda _: "Deleted LBRY file" + file_name)
return d
if 'name' in p.keys() or 'sd_hash' in p.keys() or 'file_name' in p.keys():
search_type = [k for k in p.keys() if k != 'delete_target_file'][0]
d = self._get_lbry_file(search_type, p[search_type], return_json=False)
d.addCallback(lambda l: _delete_file(l) if l else False)
try:
searchtype, value = get_lbry_file_search_value(p)
except NoValidSearch:
d = defer.fail()
else:
d = self._get_lbry_file(searchtype, value, return_json=False)
d.addCallback(_delete_file)
d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d
@ -1719,12 +1692,12 @@ class Daemon(AuthJSONRPCServer):
return m
def _reflect_if_possible(sd_hash, txid):
d = self._get_lbry_file('sd_hash', sd_hash, return_json=False)
d = self._get_lbry_file(FileID.SD_HASH, sd_hash, return_json=False)
d.addCallback(self._reflect)
d.addCallback(lambda _: txid)
return d
name = p['name']
name = p[FileID.NAME]
log.info("Publish: ")
log.info(p)
@ -1752,8 +1725,7 @@ class Daemon(AuthJSONRPCServer):
if not os.path.isfile(file_path):
return defer.fail(Exception("Specified file for publish doesnt exist: %s" % file_path))
if not self.pending_claim_checker.running:
self.pending_claim_checker.start(30)
self.looping_call_manager.start(Checker.PENDING_CLAIM, 30)
d = self._resolve_name(name, force_refresh=True)
d.addErrback(lambda _: None)
@ -1833,7 +1805,7 @@ class Daemon(AuthJSONRPCServer):
txid
"""
name = p['name']
name = p[FileID.NAME]
claim_id = p['claim_id']
amount = p['amount']
d = self.session.wallet.support_claim(name, claim_id, amount)
@ -1874,7 +1846,7 @@ class Daemon(AuthJSONRPCServer):
list of name claims
"""
name = p['name']
name = p[FileID.NAME]
d = self.session.wallet.get_claims_for_name(name)
d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d
@ -2076,11 +2048,12 @@ class Daemon(AuthJSONRPCServer):
Returns
sd blob, dict
"""
sd_hash = p['sd_hash']
sd_hash = p[FileID.SD_HASH]
timeout = p.get('timeout', lbrynet_settings.sd_download_timeout)
d = self._download_sd_blob(sd_hash, timeout)
d.addCallbacks(lambda r: self._render_response(r, OK_CODE), lambda _: self._render_response(False, OK_CODE))
d.addCallbacks(
lambda r: self._render_response(r, OK_CODE),
lambda _: self._render_response(False, OK_CODE))
return d
def jsonrpc_get_nametrie(self):
@ -2273,8 +2246,8 @@ class Daemon(AuthJSONRPCServer):
True or traceback
"""
sd_hash = p['sd_hash']
d = self._get_lbry_file('sd_hash', sd_hash, return_json=False)
sd_hash = p[FileID.SD_HASH]
d = self._get_lbry_file(FileID.SD_HASH, sd_hash, return_json=False)
d.addCallback(self._reflect)
d.addCallbacks(lambda _: self._render_response(True, OK_CODE), lambda err: self._render_response(err.getTraceback(), OK_CODE))
return d
@ -2354,7 +2327,7 @@ class Daemon(AuthJSONRPCServer):
else:
return 0.0
name = p['name']
name = p[FileID.NAME]
d = self._resolve_name(name, force_refresh=True)
d.addCallback(get_sd_hash)
@ -2374,6 +2347,14 @@ class Daemon(AuthJSONRPCServer):
return self._render_response("Not using authentication", OK_CODE)
def get_lbryum_version_from_github():
r = urlopen("https://raw.githubusercontent.com/lbryio/lbryum/master/lib/version.py").read().split('\n')
version = next(line.split("=")[1].split("#")[0].replace(" ", "")
for line in r if "LBRYUM_VERSION" in line)
version = version.replace("'", "")
return version
def get_lbrynet_version_from_github():
"""Return the latest released version from github."""
response = requests.get('https://api.github.com/repos/lbryio/lbry/releases/latest')
@ -2590,3 +2571,130 @@ class _ResolveNameHelper(object):
def is_cached_name_expired(self):
time_in_cache = self.now() - self.name_data['timestamp']
return time_in_cache >= self.daemon.cache_time
class _GetFileHelper(object):
def __init__(self, daemon, search_by, val, return_json=True):
self.daemon = daemon
self.search_by = search_by
self.val = val
self.return_json = return_json
def retrieve_file(self):
d = self.search_for_file()
if self.return_json:
d.addCallback(self._get_json)
return d
def search_for_file(self):
if self.search_by == FileID.NAME:
return self.daemon._get_lbry_file_by_uri(self.val)
elif self.search_by == FileID.SD_HASH:
return self.daemon._get_lbry_file_by_sd_hash(self.val)
elif self.search_by == FileID.FILE_NAME:
return self.daemon._get_lbry_file_by_file_name(self.val)
raise Exception('{} is not a valid search operation'.format(self.search_by))
def _get_json(self, lbry_file):
if lbry_file:
d = lbry_file.get_total_bytes()
d.addCallback(self._generate_reply, lbry_file)
d.addCallback(self._add_metadata, lbry_file)
return d
else:
return False
def _generate_reply(self, size, lbry_file):
written_bytes = self._get_written_bytes(lbry_file)
code, message = self._get_status(lbry_file)
if code == DOWNLOAD_RUNNING_CODE:
d = lbry_file.status()
d.addCallback(self._get_msg_for_file_status)
d.addCallback(
lambda msg: self._get_properties_dict(lbry_file, code, msg, written_bytes, size))
else:
d = defer.succeed(
self._get_properties_dict(lbry_file, code, message, written_bytes, size))
return d
def _get_msg_for_file_status(self, file_status):
message = STREAM_STAGES[2][1] % (
file_status.name, file_status.num_completed, file_status.num_known,
file_status.running_status)
return defer.succeed(message)
def _get_key(self, lbry_file):
return binascii.b2a_hex(lbry_file.key) if lbry_file.key else None
def _full_path(self, lbry_file):
return os.path.join(lbry_file.download_directory, lbry_file.file_name)
def _get_status(self, lbry_file):
if self.search_by == FileID.NAME:
if self.val in self.daemon.streams.keys():
status = self.daemon.streams[self.val].code
elif lbry_file in self.daemon.lbry_file_manager.lbry_files:
status = STREAM_STAGES[2]
else:
status = [False, False]
else:
status = [False, False]
return status
def _get_written_bytes(self, lbry_file):
full_path = self._full_path(lbry_file)
if os.path.isfile(full_path):
with open(full_path) as written_file:
written_file.seek(0, os.SEEK_END)
written_bytes = written_file.tell()
else:
written_bytes = False
return written_bytes
def _get_properties_dict(self, lbry_file, code, message, written_bytes, size):
key = self._get_key(lbry_file)
full_path = self._full_path(lbry_file)
mime_type = mimetypes.guess_type(full_path)[0]
return {
'completed': lbry_file.completed,
'file_name': lbry_file.file_name,
'download_directory': lbry_file.download_directory,
'points_paid': lbry_file.points_paid,
'stopped': lbry_file.stopped,
'stream_hash': lbry_file.stream_hash,
'stream_name': lbry_file.stream_name,
'suggested_file_name': lbry_file.suggested_file_name,
'upload_allowed': lbry_file.upload_allowed,
'sd_hash': lbry_file.sd_hash,
'lbry_uri': lbry_file.uri,
'txid': lbry_file.txid,
'claim_id': lbry_file.claim_id,
'download_path': full_path,
'mime_type': mime_type,
'key': key,
'total_bytes': size,
'written_bytes': written_bytes,
'code': code,
'message': message
}
def _add_metadata(self, message, lbry_file):
def _add_to_dict(metadata):
message['metadata'] = metadata
return defer.succeed(message)
if lbry_file.txid:
d = self.daemon._resolve_name(lbry_file.uri)
d.addCallbacks(_add_to_dict, lambda _: _add_to_dict("Pending confirmation"))
else:
d = defer.succeed(message)
return d
def get_lbry_file_search_value(p):
for searchtype in (FileID.SD_HASH, FileID.NAME, FileID.FILE_NAME):
value = p.get(searchtype)
if value:
return searchtype, value
raise NoValidSearch()

View file

@ -84,6 +84,11 @@ def main():
result = LBRYAPIClient.config(service=meth, params=params)
print json.dumps(result, sort_keys=True)
except:
# TODO: The api should return proper error codes
# and messages so that they can be passed along to the user
# instead of this generic message.
# https://app.asana.com/0/158602294500137/200173944358192
print "Something went wrong, here's the usage for %s:" % meth
print api.help({'function': meth})
else:

View file

@ -104,9 +104,12 @@ class UIManager(object):
def _up_to_date(self):
def _get_git_info():
try:
response = urlopen(self._git_url)
data = json.loads(response.read())
return defer.succeed(data['sha'])
except Exception:
return defer.fail()
def _set_git(version):
self.git_version = version.replace('\n', '')

View file

@ -1,33 +0,0 @@
#!/usr/bin/env python
#
# This library is free software, distributed under the terms of
# the GNU Lesser General Public License Version 3, or any later version.
# See the COPYING file included in this archive
""" Wrapper script to run all included test scripts """
import os, sys
import unittest
def runTests():
testRunner = unittest.TextTestRunner()
testRunner.run(additional_tests())
def additional_tests():
""" Used directly by setuptools to run unittests """
sys.path.insert(0, os.path.dirname(__file__))
suite = unittest.TestSuite()
tests = os.listdir(os.path.dirname(__file__))
tests = [n[:-3] for n in tests if n.startswith('test') and n.endswith('.py')]
for test in tests:
m = __import__(test)
if hasattr(m, 'suite'):
suite.addTest(m.suite())
sys.path.pop(0)
return suite
if __name__ == '__main__':
# Add parent folder to sys path so it's easier to use
sys.path.insert(0,os.path.abspath('..'))
runTests()

View file

@ -1,47 +0,0 @@
#!/usr/bin/env python
#
# This library is free software, distributed under the terms of
# the GNU Lesser General Public License Version 3, or any later version.
# See the COPYING file included in this archive
import unittest
import lbrynet.dht.contact
class ContactOperatorsTest(unittest.TestCase):
""" Basic tests case for boolean operators on the Contact class """
def setUp(self):
self.firstContact = lbrynet.dht.contact.Contact('firstContactID', '127.0.0.1', 1000, None, 1)
self.secondContact = lbrynet.dht.contact.Contact('2ndContactID', '192.168.0.1', 1000, None, 32)
self.secondContactCopy = lbrynet.dht.contact.Contact('2ndContactID', '192.168.0.1', 1000, None, 32)
self.firstContactDifferentValues = lbrynet.dht.contact.Contact('firstContactID', '192.168.1.20', 1000, None, 50)
def testBoolean(self):
""" Test "equals" and "not equals" comparisons """
self.failIfEqual(self.firstContact, self.secondContact, 'Contacts with different IDs should not be equal.')
self.failUnlessEqual(self.firstContact, self.firstContactDifferentValues, 'Contacts with same IDs should be equal, even if their other values differ.')
self.failUnlessEqual(self.secondContact, self.secondContactCopy, 'Different copies of the same Contact instance should be equal')
def testStringComparisons(self):
""" Test comparisons of Contact objects with str types """
self.failUnlessEqual('firstContactID', self.firstContact, 'The node ID string must be equal to the contact object')
self.failIfEqual('some random string', self.firstContact, "The tested string should not be equal to the contact object (not equal to it's ID)")
def testIllogicalComparisons(self):
""" Test comparisons with non-Contact and non-str types """
for item in (123, [1,2,3], {'key': 'value'}):
self.failIfEqual(self.firstContact, item, '"eq" operator: Contact object should not be equal to %s type' % type(item).__name__)
self.failUnless(self.firstContact != item, '"ne" operator: Contact object should not be equal to %s type' % type(item).__name__)
def testCompactIP(self):
self.assertEqual(self.firstContact.compact_ip(), '\x7f\x00\x00\x01')
self.assertEqual(self.secondContact.compact_ip(), '\xc0\xa8\x00\x01')
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(ContactOperatorsTest))
return suite
if __name__ == '__main__':
# If this module is executed from the commandline, run all its tests
unittest.TextTestRunner().run(suite())

View file

@ -1,10 +1,11 @@
import shutil
from multiprocessing import Process, Event, Queue
import io
import logging
from multiprocessing import Process, Event, Queue
import os
import platform
import shutil
import sys
import random
import io
import unittest
from Crypto.PublicKey import RSA
@ -16,9 +17,16 @@ from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManag
from lbrynet.lbrylive.LiveStreamMetadataManager import TempLiveStreamMetadataManager
from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager, \
DBEncryptedFileMetadataManager
from lbrynet import analytics
from lbrynet.lbrylive.LiveStreamCreator import FileLiveStreamCreator
from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager
from lbrynet.lbrylive.LiveStreamMetadataManager import TempLiveStreamMetadataManager
from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager
from lbrynet.lbryfile.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager
from lbrynet.core.PTCWallet import PointTraderKeyQueryHandlerFactory, PointTraderKeyExchanger
from lbrynet.core.Session import Session
from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory
from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorWriter
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier
@ -29,17 +37,29 @@ from lbrynet.lbryfile.StreamDescriptor import get_sd_info
from twisted.internet import defer, threads, task
from twisted.trial.unittest import TestCase
from twisted.python.failure import Failure
import os
from lbrynet.dht.node import Node
from tests.mocks import DummyBlobAvailabilityTracker
from lbrynet.core.PeerManager import PeerManager
from lbrynet.core.RateLimiter import DummyRateLimiter, RateLimiter
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
from lbrynet.lbrylive.server.LiveBlobInfoQueryHandler import CryptBlobInfoQueryHandlerFactory
from lbrynet.lbrylive.client.LiveStreamOptions import add_live_stream_to_sd_identifier
from lbrynet.lbrylive.client.LiveStreamDownloader import add_full_live_stream_downloader_to_sd_identifier
from tests import mocks
FakeNode = mocks.Node
FakeWallet = mocks.Wallet
FakePeerFinder = mocks.PeerFinder
FakeAnnouncer = mocks.Announcer
GenFile = mocks.GenFile
test_create_stream_sd_file = mocks.create_stream_sd_file
DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker
log_format = "%(funcName)s(): %(message)s"
logging.basicConfig(level=logging.WARNING, format=log_format)
@ -54,160 +74,7 @@ def require_system(system):
return unittest.skip("Skipping. Test can only be run on " + system)
class FakeNode(object):
def __init__(self, *args, **kwargs):
pass
def joinNetwork(self, *args):
pass
def stop(self):
pass
class FakeWallet(object):
def __init__(self):
self.private_key = RSA.generate(1024)
self.encoded_public_key = self.private_key.publickey().exportKey()
def start(self):
return defer.succeed(True)
def stop(self):
return defer.succeed(True)
def get_info_exchanger(self):
return PointTraderKeyExchanger(self)
def get_wallet_info_query_handler_factory(self):
return PointTraderKeyQueryHandlerFactory(self)
def reserve_points(self, *args):
return True
def cancel_point_reservation(self, *args):
pass
def send_points(self, *args):
return defer.succeed(True)
def add_expected_payment(self, *args):
pass
def get_balance(self):
return defer.succeed(1000)
def set_public_key_for_peer(self, peer, public_key):
pass
def get_claim_metadata_for_sd_hash(self, sd_hash):
return "fakeuri", "faketxid"
class FakePeerFinder(object):
def __init__(self, start_port, peer_manager, num_peers):
self.start_port = start_port
self.peer_manager = peer_manager
self.num_peers = num_peers
self.count = 0
def find_peers_for_blob(self, *args):
peer_port = self.start_port + self.count
self.count += 1
if self.count >= self.num_peers:
self.count = 0
return defer.succeed([self.peer_manager.get_peer("127.0.0.1", peer_port)])
def run_manage_loop(self):
pass
def stop(self):
pass
class FakeAnnouncer(object):
def __init__(self, *args):
pass
def add_supplier(self, supplier):
pass
def immediate_announce(self, *args):
pass
def run_manage_loop(self):
pass
def stop(self):
pass
class GenFile(io.RawIOBase):
def __init__(self, size, pattern):
io.RawIOBase.__init__(self)
self.size = size
self.pattern = pattern
self.read_so_far = 0
self.buff = b''
self.last_offset = 0
def readable(self):
return True
def writable(self):
return False
def read(self, n=-1):
if n > -1:
bytes_to_read = min(n, self.size - self.read_so_far)
else:
bytes_to_read = self.size - self.read_so_far
output, self.buff = self.buff[:bytes_to_read], self.buff[bytes_to_read:]
bytes_to_read -= len(output)
while bytes_to_read > 0:
self.buff = self._generate_chunk()
new_output, self.buff = self.buff[:bytes_to_read], self.buff[bytes_to_read:]
bytes_to_read -= len(new_output)
output += new_output
self.read_so_far += len(output)
return output
def readall(self):
return self.read()
def _generate_chunk(self, n=2 ** 10):
output = self.pattern[self.last_offset:self.last_offset + n]
n_left = n - len(output)
whole_patterns = n_left / len(self.pattern)
output += self.pattern * whole_patterns
self.last_offset = n - len(output)
output += self.pattern[:self.last_offset]
return output
test_create_stream_sd_file = {
'stream_name': '746573745f66696c65',
'blobs': [
{'length': 2097152, 'blob_num': 0,
'blob_hash':
'dc4708f76a5e7af0f1cae0ee96b824e2ed9250c9346c093b441f0a20d3607c17948b6fcfb4bc62020fe5286693d08586',
'iv': '30303030303030303030303030303031'},
{'length': 2097152, 'blob_num': 1,
'blob_hash':
'f4067522c1b49432a2a679512e3917144317caa1abba0c041e0cd2cf9f635d4cf127ce1824fa04189b63916174951f70',
'iv': '30303030303030303030303030303032'},
{'length': 1015056, 'blob_num': 2,
'blob_hash':
'305486c434260484fcb2968ce0e963b72f81ba56c11b08b1af0789b55b44d78422600f9a38e3cf4f2e9569897e5646a9',
'iv': '30303030303030303030303030303033'},
{'length': 0, 'blob_num': 3, 'iv': '30303030303030303030303030303034'}],
'stream_type': 'lbryfile',
'key': '30313233343536373031323334353637',
'suggested_file_name': '746573745f66696c65',
'stream_hash': '6d27fbe10c86d81aacfb897c7a426d0a2214f5a299455a6d315c0f998c4b3545c2dc60906122d94653c23b1898229e3f'}
def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rate_limit=None, is_generous=False):
def use_epoll_on_linux():
if sys.platform.startswith("linux"):
sys.modules = sys.modules.copy()
del sys.modules['twisted.internet.reactor']
@ -215,47 +82,63 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat
twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor()
sys.modules['twisted.internet.reactor'] = twisted.internet.reactor
class LbryUploader(object):
def __init__(self, sd_hash_queue, kill_event, dead_event,
file_size, ul_rate_limit=None, is_generous=False):
self.sd_hash_queue = sd_hash_queue
self.kill_event = kill_event
self.dead_event = dead_event
self.file_size = file_size
self.ul_rate_limit = ul_rate_limit
self.is_generous = is_generous
# these attributes get defined in `start`
self.reactor = None
self.sd_identifier = None
self.session = None
self.lbry_file_manager = None
self.server_port = None
self.kill_check = None
def start(self):
use_epoll_on_linux()
from twisted.internet import reactor
self.reactor = reactor
logging.debug("Starting the uploader")
Random.atfork()
r = random.Random()
r.seed("start_lbry_uploader")
wallet = FakeWallet()
peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager, 1)
hash_announcer = FakeAnnouncer()
rate_limiter = RateLimiter()
sd_identifier = StreamDescriptorIdentifier()
self.sd_identifier = StreamDescriptorIdentifier()
db_dir = "server"
os.mkdir(db_dir)
session = Session(settings.data_rate, db_dir=db_dir, lbryid="abcd",
self.session = Session(
settings.data_rate, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node, is_generous=settings.is_generous_host)
dht_node_class=Node, is_generous=self.is_generous)
stream_info_manager = TempEncryptedFileMetadataManager()
self.lbry_file_manager = EncryptedFileManager(
self.session, stream_info_manager, self.sd_identifier)
if self.ul_rate_limit is not None:
self.session.rate_limiter.set_ul_limit(self.ul_rate_limit)
reactor.callLater(1, self.start_all)
if not reactor.running:
reactor.run()
lbry_file_manager = EncryptedFileManager(session, stream_info_manager, sd_identifier)
if ul_rate_limit is not None:
session.rate_limiter.set_ul_limit(ul_rate_limit)
def start_all():
d = session.setup()
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
d.addCallback(lambda _: lbry_file_manager.setup())
d.addCallback(lambda _: start_server())
d.addCallback(lambda _: create_stream())
d.addCallback(create_stream_descriptor)
d.addCallback(put_sd_hash_on_queue)
def start_all(self):
d = self.session.setup()
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier))
d.addCallback(lambda _: self.lbry_file_manager.setup())
d.addCallback(lambda _: self.start_server())
d.addCallback(lambda _: self.create_stream())
d.addCallback(self.create_stream_descriptor)
d.addCallback(self.put_sd_hash_on_queue)
def print_error(err):
logging.critical("Server error: %s", err.getErrorMessage())
@ -263,70 +146,60 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat
d.addErrback(print_error)
return d
def start_server():
server_port = None
def start_server(self):
session = self.session
query_handler_factories = {
BlobRequestHandlerFactory(session.blob_manager, session.wallet,
session.payment_rate_manager): True,
BlobAvailabilityHandlerFactory(session.blob_manager): True,
BlobRequestHandlerFactory(
session.blob_manager, session.wallet,
session.payment_rate_manager,
analytics.Track()): True,
session.wallet.get_wallet_info_query_handler_factory(): True,
}
server_factory = ServerProtocolFactory(session.rate_limiter,
query_handler_factories,
session.peer_manager)
server_port = reactor.listenTCP(5553, server_factory)
self.server_port = self.reactor.listenTCP(5553, server_factory)
logging.debug("Started listening")
def kill_server():
ds = []
ds.append(session.shut_down())
ds.append(lbry_file_manager.stop())
if server_port:
ds.append(server_port.stopListening())
kill_check.stop()
dead_event.set()
dl = defer.DeferredList(ds)
dl.addCallback(lambda _: reactor.stop())
return dl
def check_for_kill():
if kill_event.is_set():
kill_server()
kill_check = task.LoopingCall(check_for_kill)
kill_check.start(1.0)
self.kill_check = task.LoopingCall(self.check_for_kill)
self.kill_check.start(1.0)
return True
def create_stream():
test_file = GenFile(file_size, b''.join([chr(i) for i in xrange(0, 64, 6)]))
d = create_lbry_file(session, lbry_file_manager, "test_file", test_file)
def kill_server(self):
session = self.session
ds = []
ds.append(session.shut_down())
ds.append(self.lbry_file_manager.stop())
if self.server_port:
ds.append(self.server_port.stopListening())
self.kill_check.stop()
self.dead_event.set()
dl = defer.DeferredList(ds)
dl.addCallback(lambda _: self.reactor.stop())
return dl
def check_for_kill(self):
if self.kill_event.is_set():
self.kill_server()
def create_stream(self):
test_file = GenFile(self.file_size, b''.join([chr(i) for i in xrange(0, 64, 6)]))
d = create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file)
return d
def create_stream_descriptor(stream_hash):
descriptor_writer = BlobStreamDescriptorWriter(session.blob_manager)
d = get_sd_info(lbry_file_manager.stream_info_manager, stream_hash, True)
def create_stream_descriptor(self, stream_hash):
descriptor_writer = BlobStreamDescriptorWriter(self.session.blob_manager)
d = get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True)
d.addCallback(descriptor_writer.create_descriptor)
return d
def put_sd_hash_on_queue(sd_hash):
sd_hash_queue.put(sd_hash)
reactor.callLater(1, start_all)
if not reactor.running:
reactor.run()
def put_sd_hash_on_queue(self, sd_hash):
self.sd_hash_queue.put(sd_hash)
def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_rate_limit=None, is_generous=False):
if sys.platform.startswith("linux"):
sys.modules = sys.modules.copy()
del sys.modules['twisted.internet.reactor']
import twisted.internet
twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor()
sys.modules['twisted.internet.reactor'] = twisted.internet.reactor
def start_lbry_reuploader(sd_hash, kill_event, dead_event,
ready_event, n, ul_rate_limit=None, is_generous=False):
use_epoll_on_linux()
from twisted.internet import reactor
logging.debug("Starting the uploader")
@ -334,7 +207,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra
Random.atfork()
r = random.Random()
r.seed("start_lbry_uploader")
r.seed("start_lbry_reuploader")
wallet = FakeWallet()
peer_port = 5553 + n
@ -393,8 +266,11 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra
server_port = None
query_handler_factories = {
BlobRequestHandlerFactory(session.blob_manager, session.wallet,
session.payment_rate_manager): True,
BlobAvailabilityHandlerFactory(session.blob_manager): True,
BlobRequestHandlerFactory(
session.blob_manager, session.wallet,
session.payment_rate_manager,
analytics.Track()): True,
session.wallet.get_wallet_info_query_handler_factory(): True,
}
@ -433,13 +309,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra
def start_live_server(sd_hash_queue, kill_event, dead_event):
if sys.platform.startswith("linux"):
sys.modules = sys.modules.copy()
del sys.modules['twisted.internet.reactor']
import twisted.internet
twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor()
sys.modules['twisted.internet.reactor'] = twisted.internet.reactor
use_epoll_on_linux()
from twisted.internet import reactor
logging.debug("In start_server.")
@ -475,7 +345,8 @@ def start_live_server(sd_hash_queue, kill_event, dead_event):
CryptBlobInfoQueryHandlerFactory(stream_info_manager, session.wallet,
session.payment_rate_manager): True,
BlobRequestHandlerFactory(session.blob_manager, session.wallet,
session.payment_rate_manager): True,
session.payment_rate_manager,
analytics.Track()): True,
session.wallet.get_wallet_info_query_handler_factory(): True,
}
@ -563,13 +434,7 @@ def start_live_server(sd_hash_queue, kill_event, dead_event):
def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_generous=False):
if sys.platform.startswith("linux"):
sys.modules = sys.modules.copy()
del sys.modules['twisted.internet.reactor']
import twisted.internet
twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor()
sys.modules['twisted.internet.reactor'] = twisted.internet.reactor
use_epoll_on_linux()
from twisted.internet import reactor
logging.debug("Starting the uploader")
@ -618,7 +483,10 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_genero
server_port = None
query_handler_factories = {
BlobRequestHandlerFactory(session.blob_manager, session.wallet, session.payment_rate_manager): True,
BlobAvailabilityHandlerFactory(session.blob_manager): True,
BlobRequestHandlerFactory(session.blob_manager, session.wallet,
session.payment_rate_manager,
analytics.Track()): True,
session.wallet.get_wallet_info_query_handler_factory(): True,
}
@ -747,7 +615,8 @@ class TestTransfer(TestCase):
sd_hash_queue = Queue()
kill_event = Event()
dead_event = Event()
uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_event, 5209343))
lbry_uploader = LbryUploader(sd_hash_queue, kill_event, dead_event, 5209343)
uploader = Process(target=lbry_uploader.start)
uploader.start()
self.server_processes.append(uploader)
@ -765,22 +634,25 @@ class TestTransfer(TestCase):
os.mkdir(db_dir)
os.mkdir(blob_dir)
self.session = Session(settings.data_rate, db_dir=db_dir, lbryid="abcd",
self.session = Session(
settings.data_rate, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node, is_generous=settings.is_generous_host)
dht_node_class=Node, is_generous=self.is_generous)
self.stream_info_manager = TempEncryptedFileMetadataManager()
self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager, sd_identifier)
self.lbry_file_manager = EncryptedFileManager(
self.session, self.stream_info_manager, sd_identifier)
def make_downloader(metadata, prm):
info_validator = metadata.validator
options = metadata.options
factories = metadata.factories
chosen_options = [o.default_value for o in options.get_downloader_options(info_validator, prm)]
chosen_options = [
o.default_value for o in options.get_downloader_options(info_validator, prm)]
return factories[0].make_downloader(metadata, chosen_options, prm)
def download_file(sd_hash):
@ -852,11 +724,12 @@ class TestTransfer(TestCase):
db_dir = "client"
os.mkdir(db_dir)
self.session = Session(settings.data_rate, db_dir=db_dir, lbryid="abcd",
self.session = Session(
settings.data_rate, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=None,
peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker, dht_node_class=Node,
is_generous=settings.is_generous_host)
blob_tracker_class=DummyBlobAvailabilityTracker, dht_node_class=Node
)
self.stream_info_manager = TempLiveStreamMetadataManager(hash_announcer)
@ -866,7 +739,8 @@ class TestTransfer(TestCase):
info_validator = metadata.validator
options = metadata.options
factories = metadata.factories
chosen_options = [o.default_value for o in options.get_downloader_options(info_validator, prm)]
chosen_options = [
o.default_value for o in options.get_downloader_options(info_validator, prm)]
return factories[0].make_downloader(metadata, chosen_options, prm)
def start_lbry_file(lbry_file):
@ -925,7 +799,6 @@ class TestTransfer(TestCase):
return d
def test_last_blob_retrieval(self):
kill_event = Event()
dead_event_1 = Event()
blob_hash_queue_1 = Queue()
@ -953,11 +826,13 @@ class TestTransfer(TestCase):
os.mkdir(db_dir)
os.mkdir(blob_dir)
self.session = Session(settings.data_rate, db_dir=db_dir, lbryid="abcd",
self.session = Session(
settings.data_rate, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker, is_generous=settings.is_generous_host)
blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=settings.is_generous_host)
d1 = self.wait_for_hash_from_queue(blob_hash_queue_1)
d2 = self.wait_for_hash_from_queue(blob_hash_queue_2)
@ -971,8 +846,8 @@ class TestTransfer(TestCase):
def download_blob(blob_hash):
prm = self.session.payment_rate_manager
downloader = StandaloneBlobDownloader(blob_hash, self.session.blob_manager, peer_finder,
rate_limiter, prm, wallet)
downloader = StandaloneBlobDownloader(
blob_hash, self.session.blob_manager, peer_finder, rate_limiter, prm, wallet)
d = downloader.download()
return d
@ -997,23 +872,20 @@ class TestTransfer(TestCase):
d1 = self.wait_for_event(dead_event_1, 15)
d2 = self.wait_for_event(dead_event_2, 15)
dl = defer.DeferredList([d1, d2])
def print_shutting_down():
logging.info("Client is shutting down")
dl.addCallback(lambda _: print_shutting_down())
dl.addCallback(lambda _: arg)
return dl
d.addBoth(stop)
return d
def test_double_download(self):
sd_hash_queue = Queue()
kill_event = Event()
dead_event = Event()
uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_event, 5209343))
lbry_uploader = LbryUploader(sd_hash_queue, kill_event, dead_event, 5209343)
uploader = Process(target=lbry_uploader.start)
uploader.start()
self.server_processes.append(uploader)
@ -1130,8 +1002,9 @@ class TestTransfer(TestCase):
kill_event = Event()
dead_events = [Event() for _ in range(num_uploaders)]
ready_events = [Event() for _ in range(1, num_uploaders)]
uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_events[0],
9373419, 2 ** 22))
lbry_uploader = LbryUploader(
sd_hash_queue, kill_event, dead_events[0], 5209343, 9373419, 2**22)
uploader = Process(target=lbry_uploader.start)
uploader.start()
self.server_processes.append(uploader)
@ -1153,11 +1026,13 @@ class TestTransfer(TestCase):
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=None, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker, is_generous=settings.is_generous_host)
blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=settings.is_generous_host)
self.stream_info_manager = TempEncryptedFileMetadataManager()
self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager, sd_identifier)
self.lbry_file_manager = EncryptedFileManager(
self.session, self.stream_info_manager, sd_identifier)
def start_additional_uploaders(sd_hash):
for i in range(1, num_uploaders):
@ -1226,138 +1101,3 @@ class TestTransfer(TestCase):
d.addBoth(stop)
return d
class TestStreamify(TestCase):
def setUp(self):
self.session = None
self.stream_info_manager = None
self.lbry_file_manager = None
self.addCleanup(self.take_down_env)
self.is_generous = True
def take_down_env(self):
d = defer.succeed(True)
if self.lbry_file_manager is not None:
d.addCallback(lambda _: self.lbry_file_manager.stop())
if self.session is not None:
d.addCallback(lambda _: self.session.shut_down())
if self.stream_info_manager is not None:
d.addCallback(lambda _: self.stream_info_manager.stop())
def delete_test_env():
shutil.rmtree('client')
if os.path.exists("test_file"):
os.remove("test_file")
d.addCallback(lambda _: threads.deferToThread(delete_test_env))
return d
def test_create_stream(self):
wallet = FakeWallet()
peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager, 2)
hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier()
db_dir = "client"
blob_dir = os.path.join(db_dir, "blobfiles")
os.mkdir(db_dir)
os.mkdir(blob_dir)
self.session = Session(settings.data_rate, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker, is_generous=settings.is_generous_host)
self.stream_info_manager = TempEncryptedFileMetadataManager()
self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager, sd_identifier)
d = self.session.setup()
d.addCallback(lambda _: self.stream_info_manager.setup())
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
d.addCallback(lambda _: self.lbry_file_manager.setup())
def verify_equal(sd_info):
self.assertEqual(sd_info, test_create_stream_sd_file)
def verify_stream_descriptor_file(stream_hash):
d = get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True)
d.addCallback(verify_equal)
return d
def iv_generator():
iv = 0
while 1:
iv += 1
yield "%016d" % iv
def create_stream():
test_file = GenFile(5209343, b''.join([chr(i + 3) for i in xrange(0, 64, 6)]))
d = create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file,
key="0123456701234567", iv_generator=iv_generator())
return d
d.addCallback(lambda _: create_stream())
d.addCallback(verify_stream_descriptor_file)
return d
def test_create_and_combine_stream(self):
wallet = FakeWallet()
peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager, 2)
hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier()
db_dir = "client"
blob_dir = os.path.join(db_dir, "blobfiles")
os.mkdir(db_dir)
os.mkdir(blob_dir)
self.session = Session(settings.data_rate, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker, is_generous=settings.is_generous_host)
self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir)
self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager, sd_identifier)
def start_lbry_file(lbry_file):
logging.debug("Calling lbry_file.start()")
d = lbry_file.start()
return d
def combine_stream(stream_hash):
prm = self.session.payment_rate_manager
d = self.lbry_file_manager.add_lbry_file(stream_hash, prm)
d.addCallback(start_lbry_file)
def check_md5_sum():
f = open('test_file')
hashsum = MD5.new()
hashsum.update(f.read())
self.assertEqual(hashsum.hexdigest(), "68959747edc73df45e45db6379dd7b3b")
d.addCallback(lambda _: check_md5_sum())
return d
def create_stream():
test_file = GenFile(53209343, b''.join([chr(i + 5) for i in xrange(0, 64, 6)]))
return create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file,
suggested_file_name="test_file")
d = self.session.setup()
d.addCallback(lambda _: self.stream_info_manager.setup())
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
d.addCallback(lambda _: self.lbry_file_manager.setup())
d.addCallback(lambda _: create_stream())
d.addCallback(combine_stream)
return d

View file

@ -93,7 +93,7 @@ class TestReflector(unittest.TestCase):
use_upnp=False,
rate_limiter=rate_limiter,
wallet=wallet,
blob_tracker_class=mocks.DummyBlobAvailabilityTracker,
blob_tracker_class=mocks.BlobAvailabilityTracker,
dht_node_class=Node
)

View file

@ -0,0 +1,172 @@
import logging
import os
import shutil
from Crypto.Hash import MD5
from twisted.trial.unittest import TestCase
from twisted.internet import defer, threads
from lbrynet import settings
from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager
from lbrynet.lbryfile.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager
from lbrynet.core.Session import Session
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier
from lbrynet.lbryfilemanager.EncryptedFileCreator import create_lbry_file
from lbrynet.lbryfile.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.lbryfile.StreamDescriptor import get_sd_info
from lbrynet.core.PeerManager import PeerManager
from lbrynet.core.RateLimiter import DummyRateLimiter, RateLimiter
from tests import mocks
FakeNode = mocks.Node
FakeWallet = mocks.Wallet
FakePeerFinder = mocks.PeerFinder
FakeAnnouncer = mocks.Announcer
GenFile = mocks.GenFile
test_create_stream_sd_file = mocks.create_stream_sd_file
DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker
class TestStreamify(TestCase):
def setUp(self):
self.session = None
self.stream_info_manager = None
self.lbry_file_manager = None
self.addCleanup(self.take_down_env)
self.is_generous = True
def take_down_env(self):
d = defer.succeed(True)
if self.lbry_file_manager is not None:
d.addCallback(lambda _: self.lbry_file_manager.stop())
if self.session is not None:
d.addCallback(lambda _: self.session.shut_down())
if self.stream_info_manager is not None:
d.addCallback(lambda _: self.stream_info_manager.stop())
def delete_test_env():
shutil.rmtree('client')
if os.path.exists("test_file"):
os.remove("test_file")
d.addCallback(lambda _: threads.deferToThread(delete_test_env))
return d
def test_create_stream(self):
wallet = FakeWallet()
peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager, 2)
hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier()
db_dir = "client"
blob_dir = os.path.join(db_dir, "blobfiles")
os.mkdir(db_dir)
os.mkdir(blob_dir)
self.session = Session(
settings.data_rate, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=self.is_generous
)
self.stream_info_manager = TempEncryptedFileMetadataManager()
self.lbry_file_manager = EncryptedFileManager(
self.session, self.stream_info_manager, sd_identifier)
d = self.session.setup()
d.addCallback(lambda _: self.stream_info_manager.setup())
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
d.addCallback(lambda _: self.lbry_file_manager.setup())
def verify_equal(sd_info):
self.assertEqual(sd_info, test_create_stream_sd_file)
def verify_stream_descriptor_file(stream_hash):
d = get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True)
d.addCallback(verify_equal)
return d
def iv_generator():
iv = 0
while 1:
iv += 1
yield "%016d" % iv
def create_stream():
test_file = GenFile(5209343, b''.join([chr(i + 3) for i in xrange(0, 64, 6)]))
d = create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file,
key="0123456701234567", iv_generator=iv_generator())
return d
d.addCallback(lambda _: create_stream())
d.addCallback(verify_stream_descriptor_file)
return d
def test_create_and_combine_stream(self):
wallet = FakeWallet()
peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager, 2)
hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier()
db_dir = "client"
blob_dir = os.path.join(db_dir, "blobfiles")
os.mkdir(db_dir)
os.mkdir(blob_dir)
self.session = Session(
settings.data_rate, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker
)
self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir)
self.lbry_file_manager = EncryptedFileManager(
self.session, self.stream_info_manager, sd_identifier)
def start_lbry_file(lbry_file):
logging.debug("Calling lbry_file.start()")
d = lbry_file.start()
return d
def combine_stream(stream_hash):
prm = self.session.payment_rate_manager
d = self.lbry_file_manager.add_lbry_file(stream_hash, prm)
d.addCallback(start_lbry_file)
def check_md5_sum():
f = open('test_file')
hashsum = MD5.new()
hashsum.update(f.read())
self.assertEqual(hashsum.hexdigest(), "68959747edc73df45e45db6379dd7b3b")
d.addCallback(lambda _: check_md5_sum())
return d
def create_stream():
test_file = GenFile(53209343, b''.join([chr(i + 5) for i in xrange(0, 64, 6)]))
return create_lbry_file(
self.session, self.lbry_file_manager, "test_file", test_file,
suggested_file_name="test_file")
d = self.session.setup()
d.addCallback(lambda _: self.stream_info_manager.setup())
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
d.addCallback(lambda _: self.lbry_file_manager.setup())
d.addCallback(lambda _: create_stream())
d.addCallback(combine_stream)
return d

View file

@ -5,7 +5,7 @@ from decimal import Decimal
from twisted.internet import defer
from lbrynet.core import PTCWallet
from lbrynet.core.BlobAvailability import BlobAvailabilityTracker
from lbrynet.core import BlobAvailability
class Node(object):
@ -54,6 +54,9 @@ class Wallet(object):
def set_public_key_for_peer(self, peer, public_key):
pass
def get_claim_metadata_for_sd_hash(self, sd_hash):
return "fakeuri", "faketxid"
class PeerFinder(object):
def __init__(self, start_port, peer_manager, num_peers):
@ -136,7 +139,7 @@ class GenFile(io.RawIOBase):
return output
class DummyBlobAvailabilityTracker(BlobAvailabilityTracker):
class BlobAvailabilityTracker(BlobAvailability.BlobAvailabilityTracker):
"""
Class to track peer counts for known blobs, and to discover new popular blobs

View file

View file

@ -0,0 +1,38 @@
from lbrynet.analytics import events
from twisted.trial import unittest
from tests import util
class EventsTest(unittest.TestCase):
def setUp(self):
util.resetTime(self)
self.event_generator = events.Events('any valid json datatype', 'lbry123', 'session456')
def test_heartbeat(self):
result = self.event_generator.heartbeat()
desired_result = {
'context': 'any valid json datatype',
'event': 'Heartbeat',
'properties': {'lbry_id': 'lbry123', 'session_id': 'session456'},
'timestamp': '2016-01-01T00:00:00Z',
'userId': 'lbry'
}
self.assertEqual(desired_result, result)
def test_download_started(self):
result = self.event_generator.download_started('great gatsby')
desired_result = {
'context': 'any valid json datatype',
'event': 'Download Started',
'properties': {
'lbry_id': 'lbry123',
'session_id': 'session456',
'name': 'great gatsby',
'stream_info': None,
},
'timestamp': '2016-01-01T00:00:00Z',
'userId': 'lbry'
}
self.assertEqual(desired_result, result)

View file

@ -0,0 +1,27 @@
from lbrynet import analytics
from twisted.trial import unittest
class TrackTest(unittest.TestCase):
def test_empty_summarize_is_None(self):
track = analytics.Track()
_, result = track.summarize_and_reset('a')
self.assertEqual(None, result)
def test_can_get_sum_of_metric(self):
track = analytics.Track()
track.add_observation('b', 1)
track.add_observation('b', 2)
_, result = track.summarize_and_reset('b')
self.assertEqual(3, result)
def test_summarize_resets_metric(self):
track = analytics.Track()
track.add_observation('metric', 1)
track.add_observation('metric', 2)
track.summarize_and_reset('metric')
_, result = track.summarize_and_reset('metric')
self.assertEqual(None, result)

View file

@ -5,17 +5,20 @@ from twisted.internet import defer
from twisted.test import proto_helpers
from twisted.trial import unittest
from lbrynet import analytics
from lbrynet.core import Peer
from lbrynet.core.server import BlobRequestHandler
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager
from tests.mocks import DummyBlobAvailabilityTracker
from tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker
class TestBlobRequestHandlerQueries(unittest.TestCase):
def setUp(self):
self.blob_manager = mock.Mock()
self.payment_rate_manager = NegotiatedPaymentRateManager(BasePaymentRateManager(0.001), DummyBlobAvailabilityTracker())
self.handler = BlobRequestHandler.BlobRequestHandler(self.blob_manager, None, self.payment_rate_manager)
self.payment_rate_manager = NegotiatedPaymentRateManager(
BasePaymentRateManager(0.001), DummyBlobAvailabilityTracker())
self.handler = BlobRequestHandler.BlobRequestHandler(
self.blob_manager, None, self.payment_rate_manager, None)
def test_empty_response_when_empty_query(self):
self.assertEqual({}, self.successResultOf(self.handler.handle_queries({})))
@ -107,7 +110,7 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
class TestBlobRequestHandlerSender(unittest.TestCase):
def test_nothing_happens_if_not_currently_uploading(self):
handler = BlobRequestHandler.BlobRequestHandler(None, None, None)
handler = BlobRequestHandler.BlobRequestHandler(None, None, None, None)
handler.currently_uploading = None
deferred = handler.send_blob_if_requested(None)
self.assertEqual(True, self.successResultOf(deferred))
@ -116,7 +119,8 @@ class TestBlobRequestHandlerSender(unittest.TestCase):
# TODO: also check that the expected payment values are set
consumer = proto_helpers.StringTransport()
test_file = StringIO.StringIO('test')
handler = BlobRequestHandler.BlobRequestHandler(None, None, None)
track = analytics.Track()
handler = BlobRequestHandler.BlobRequestHandler(None, None, None, track)
handler.peer = mock.create_autospec(Peer.Peer)
handler.currently_uploading = mock.Mock()
handler.read_handle = test_file

View file

@ -1,9 +1,10 @@
import mock
from lbrynet.metadata import Fee
from lbrynet.lbrynet_daemon import ExchangeRateManager
from twisted.trial import unittest
from tests import util
class FeeFormatTest(unittest.TestCase):
def test_fee_created_with_correct_inputs(self):
@ -19,10 +20,7 @@ class FeeFormatTest(unittest.TestCase):
class FeeTest(unittest.TestCase):
def setUp(self):
patcher = mock.patch('time.time')
self.time = patcher.start()
self.time.return_value = 0
self.addCleanup(patcher.stop)
util.resetTime(self)
def test_fee_converts_to_lbc(self):
fee_dict = {
@ -31,6 +29,10 @@ class FeeTest(unittest.TestCase):
'address': "bRcHraa8bYJZL7vkh5sNmGwPDERFUjGPP9"
}
}
rates = {'BTCLBC': {'spot': 3.0, 'ts': 2}, 'USDBTC': {'spot': 2.0, 'ts': 3}}
rates = {
'BTCLBC': {'spot': 3.0, 'ts': util.DEFAULT_ISO_TIME + 1},
'USDBTC': {'spot': 2.0, 'ts': util.DEFAULT_ISO_TIME + 2}
}
manager = ExchangeRateManager.DummyExchangeRateManager(rates)
self.assertEqual(60.0, manager.to_lbc(fee_dict).amount)
result = manager.to_lbc(fee_dict).amount
self.assertEqual(60.0, result)

View file

@ -5,7 +5,7 @@ import mock
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager
from lbrynet.core.Strategy import BasicAvailabilityWeightedStrategy
from lbrynet.core.Offer import Offer
from tests.mocks import DummyBlobAvailabilityTracker
from tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker
MAX_NEGOTIATION_TURNS = 10
random.seed(12345)

View file

View file

@ -0,0 +1,49 @@
import unittest
from lbrynet.dht import contact
class ContactOperatorsTest(unittest.TestCase):
""" Basic tests case for boolean operators on the Contact class """
def setUp(self):
self.firstContact = contact.Contact('firstContactID', '127.0.0.1', 1000, None, 1)
self.secondContact = contact.Contact('2ndContactID', '192.168.0.1', 1000, None, 32)
self.secondContactCopy = contact.Contact('2ndContactID', '192.168.0.1', 1000, None, 32)
self.firstContactDifferentValues = contact.Contact(
'firstContactID', '192.168.1.20', 1000, None, 50)
def testBoolean(self):
""" Test "equals" and "not equals" comparisons """
self.failIfEqual(
self.firstContact, self.secondContact,
'Contacts with different IDs should not be equal.')
self.failUnlessEqual(
self.firstContact, self.firstContactDifferentValues,
'Contacts with same IDs should be equal, even if their other values differ.')
self.failUnlessEqual(
self.secondContact, self.secondContactCopy,
'Different copies of the same Contact instance should be equal')
def testStringComparisons(self):
""" Test comparisons of Contact objects with str types """
self.failUnlessEqual(
'firstContactID', self.firstContact,
'The node ID string must be equal to the contact object')
self.failIfEqual(
'some random string', self.firstContact,
"The tested string should not be equal to the contact object (not equal to it's ID)")
def testIllogicalComparisons(self):
""" Test comparisons with non-Contact and non-str types """
msg = '"{}" operator: Contact object should not be equal to {} type'
for item in (123, [1,2,3], {'key': 'value'}):
self.failIfEqual(
self.firstContact, item,
msg.format('eq', type(item).__name__))
self.failUnless(
self.firstContact != item,
msg.format('ne', type(item).__name__))
def testCompactIP(self):
self.assertEqual(self.firstContact.compact_ip(), '\x7f\x00\x00\x01')
self.assertEqual(self.secondContact.compact_ip(), '\xc0\xa8\x00\x01')

23
tests/util.py Normal file
View file

@ -0,0 +1,23 @@
import datetime
import time
import mock
DEFAULT_TIMESTAMP = datetime.datetime(2016, 1, 1)
DEFAULT_ISO_TIME = time.mktime(DEFAULT_TIMESTAMP.timetuple())
def resetTime(test_case, timestamp=DEFAULT_TIMESTAMP):
iso_time = time.mktime(timestamp.timetuple())
patcher = mock.patch('time.time')
patcher.start().return_value = iso_time
test_case.addCleanup(patcher.stop)
patcher = mock.patch('lbrynet.core.utils.now')
patcher.start().return_value = timestamp
test_case.addCleanup(patcher.stop)
patcher = mock.patch('lbrynet.core.utils.utcnow')
patcher.start().return_value = timestamp
test_case.addCleanup(patcher.stop)