forked from LBRYCommunity/lbry-sdk
Merge branch 'master' into error-messages
# Conflicts: # lbrynet/lbrynet_daemon/Daemon.py # lbrynet/lbrynet_daemon/auth/server.py
This commit is contained in:
commit
0a9ef07787
28 changed files with 497 additions and 331 deletions
|
@ -1,5 +1,5 @@
|
|||
[bumpversion]
|
||||
current_version = 0.7.6rc0
|
||||
current_version = 0.7.6
|
||||
commit = True
|
||||
tag = True
|
||||
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)((?P<release>[a-z]+)(?P<candidate>\d+))?
|
||||
|
|
|
@ -27,7 +27,7 @@ cache:
|
|||
before_install:
|
||||
- if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then ./packaging/travis/setup_osx.sh; fi
|
||||
- mkdir -p lbrynet/resources/ui
|
||||
- ./packaging/travis/setup_qa.sh
|
||||
- ./packaging/travis/setup_build.sh
|
||||
|
||||
install:
|
||||
- if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then ./packaging/travis/install_dependencies_and_run_tests.sh; fi
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import logging
|
||||
|
||||
__version__ = "0.7.6rc0"
|
||||
__version__ = "0.7.6"
|
||||
version = tuple(__version__.split('.'))
|
||||
|
||||
logging.getLogger(__name__).addHandler(logging.NullHandler())
|
||||
|
|
|
@ -56,11 +56,10 @@ class Api(object):
|
|||
def track(self, event):
|
||||
"""Send a single tracking event"""
|
||||
log.debug('Sending track event: %s', event)
|
||||
import base64
|
||||
return self.session.post(self.url + '/track', json=event, auth=self.auth)
|
||||
|
||||
@classmethod
|
||||
def load(cls, session=None):
|
||||
def new_instance(cls, session=None):
|
||||
"""Initialize an instance using values from the configuration"""
|
||||
if not session:
|
||||
session = sessions.FuturesSession()
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import logging
|
||||
|
||||
from lbrynet.core import utils
|
||||
|
||||
from lbrynet.conf import LBRYUM_WALLET
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
@ -22,6 +22,20 @@ class Events(object):
|
|||
self.lbry_id = lbry_id
|
||||
self.session_id = session_id
|
||||
|
||||
def update_context(self, context):
|
||||
self.context = context
|
||||
|
||||
def server_startup(self):
|
||||
return self._event('Server Startup')
|
||||
|
||||
def server_startup_success(self):
|
||||
return self._event('Server Startup Success')
|
||||
|
||||
def server_startup_error(self, message):
|
||||
return self._event('Server Startup Error', {
|
||||
'message': message,
|
||||
})
|
||||
|
||||
def heartbeat(self):
|
||||
return self._event('Heartbeat')
|
||||
|
||||
|
@ -32,6 +46,13 @@ class Events(object):
|
|||
}
|
||||
return self._event('Download Started', properties)
|
||||
|
||||
def error(self, message, sd_hash=None):
|
||||
properties = {
|
||||
'message': message,
|
||||
'stream_info': sd_hash
|
||||
}
|
||||
return self._event('Error', properties)
|
||||
|
||||
def metric_observed(self, metric_name, value):
|
||||
properties = {
|
||||
'value': value,
|
||||
|
@ -57,19 +78,18 @@ class Events(object):
|
|||
return properties
|
||||
|
||||
|
||||
def make_context(platform, wallet, is_dev=False):
|
||||
# TODO: distinguish between developer and release instances
|
||||
def make_context(platform, wallet):
|
||||
return {
|
||||
'is_dev': is_dev,
|
||||
'app': {
|
||||
'name': 'lbrynet',
|
||||
'version': platform['lbrynet_version'],
|
||||
'ui_version': platform['ui_version'],
|
||||
'python_version': platform['python_version'],
|
||||
'build': platform['build'],
|
||||
'wallet': {
|
||||
'name': wallet,
|
||||
# TODO: add in version info for lbrycrdd
|
||||
'version': platform['lbryum_version'] if wallet == 'lbryum' else None
|
||||
'version': platform['lbryum_version'] if wallet == LBRYUM_WALLET else None
|
||||
},
|
||||
},
|
||||
# TODO: expand os info to give linux/osx specific info
|
||||
|
|
|
@ -3,7 +3,13 @@ from lbrynet.core import looping_call_manager
|
|||
from twisted.internet import defer
|
||||
from twisted.internet import task
|
||||
|
||||
from lbrynet.core.Platform import get_platform
|
||||
from lbrynet.conf import settings
|
||||
|
||||
import constants
|
||||
from api import Api
|
||||
from events import Events, make_context
|
||||
from track import Track
|
||||
|
||||
|
||||
class Manager(object):
|
||||
|
@ -12,28 +18,63 @@ class Manager(object):
|
|||
self.events_generator = events_generator
|
||||
self.track = track
|
||||
self.looping_call_manager = self.setup_looping_calls()
|
||||
self.is_started = False
|
||||
|
||||
@classmethod
|
||||
def new_instance(cls, api=None, events=None):
|
||||
if api is None:
|
||||
api = Api.new_instance()
|
||||
if events is None:
|
||||
events = Events(
|
||||
make_context(get_platform(), settings.wallet),
|
||||
'not loaded', 'not loaded'
|
||||
)
|
||||
return cls(api, events, Track())
|
||||
|
||||
def update_events_generator(self, events_generator):
|
||||
self.events_generator = events_generator
|
||||
|
||||
def _get_looping_calls(self):
|
||||
return [
|
||||
('send_heartbeat', self._send_heartbeat, 60),
|
||||
('update_tracked_metrics', self._update_tracked_metrics, 300),
|
||||
]
|
||||
|
||||
def setup_looping_calls(self):
|
||||
call_manager = looping_call_manager.LoopingCallManager()
|
||||
looping_calls = [
|
||||
('send_heartbeat', self._send_heartbeat),
|
||||
('update_tracked_metrics', self._update_tracked_metrics),
|
||||
]
|
||||
for name, fn in looping_calls:
|
||||
for name, fn, _ in self._get_looping_calls():
|
||||
call_manager.register_looping_call(name, task.LoopingCall(fn))
|
||||
return call_manager
|
||||
|
||||
def start(self):
|
||||
self.looping_call_manager.start('send_heartbeat', 60)
|
||||
self.looping_call_manager.start('update_tracked_metrics', 300)
|
||||
if not self.is_started:
|
||||
for name, _, interval in self._get_looping_calls():
|
||||
self.looping_call_manager.start(name, interval)
|
||||
self.is_started = True
|
||||
|
||||
def shutdown(self):
|
||||
self.looping_call_manager.shutdown()
|
||||
|
||||
def send_server_startup(self):
|
||||
event = self.events_generator.server_startup()
|
||||
self.analytics_api.track(event)
|
||||
|
||||
def send_server_startup_success(self):
|
||||
event = self.events_generator.server_startup_success()
|
||||
self.analytics_api.track(event)
|
||||
|
||||
def send_server_startup_error(self, message):
|
||||
event = self.events_generator.server_startup_error(message)
|
||||
self.analytics_api.track(event)
|
||||
|
||||
def send_download_started(self, name, stream_info=None):
|
||||
event = self.events_generator.download_started(name, stream_info)
|
||||
self.analytics_api.track(event)
|
||||
|
||||
def send_error(self, message, sd_hash=None):
|
||||
event = self.events_generator.error(message, sd_hash)
|
||||
self.analytics_api.track(event)
|
||||
|
||||
def register_repeating_metric(self, event_name, value_generator, frequency=300):
|
||||
lcall = task.LoopingCall(self._send_repeating_metric, event_name, value_generator)
|
||||
self.looping_call_manager.register_looping_call(event_name, lcall)
|
||||
|
|
2
lbrynet/build_type.py
Normal file
2
lbrynet/build_type.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
# dont touch this. Travis changes this during build/deployment
|
||||
BUILD = "dev"
|
|
@ -7,6 +7,9 @@ import yaml
|
|||
|
||||
from appdirs import user_data_dir
|
||||
|
||||
LBRYCRD_WALLET = 'lbrycrd'
|
||||
LBRYUM_WALLET = 'lbryum'
|
||||
PTC_WALLET = 'ptc'
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
@ -172,7 +175,7 @@ class AdjustableSettings(Setting):
|
|||
]
|
||||
self.pointtrader_server = 'http://127.0.0.1:2424'
|
||||
self.reflector_servers = [("reflector.lbry.io", 5566)]
|
||||
self.wallet = "lbryum"
|
||||
self.wallet = LBRYUM_WALLET
|
||||
self.ui_branch = "master"
|
||||
self.default_ui_branch = 'master'
|
||||
self.data_dir = default_data_dir
|
||||
|
@ -200,7 +203,7 @@ class ApplicationSettings(Setting):
|
|||
self.ICON_PATH = "icons" if platform is WINDOWS else "app.icns"
|
||||
self.APP_NAME = "LBRY"
|
||||
self.PROTOCOL_PREFIX = "lbry"
|
||||
self.WALLET_TYPES = ["lbryum", "lbrycrd"]
|
||||
self.WALLET_TYPES = [LBRYUM_WALLET, LBRYCRD_WALLET]
|
||||
self.SOURCE_TYPES = ['lbry_sd_hash', 'url', 'btih']
|
||||
self.CURRENCIES = {
|
||||
'BTC': {'type': 'crypto'},
|
||||
|
|
|
@ -341,7 +341,6 @@ class HashBlobCreator(object):
|
|||
else:
|
||||
self.blob_hash = self.hashsum.hexdigest()
|
||||
d = self._close()
|
||||
|
||||
if self.blob_hash is not None:
|
||||
d.addCallback(lambda _: self.blob_manager.creator_finished(self))
|
||||
d.addCallback(lambda _: self.blob_hash)
|
||||
|
@ -394,4 +393,4 @@ class TempBlobCreator(HashBlobCreator):
|
|||
return defer.succeed(True)
|
||||
|
||||
def _write(self, data):
|
||||
self.data_buffer += data
|
||||
self.data_buffer += data
|
||||
|
|
29
lbrynet/core/Platform.py
Normal file
29
lbrynet/core/Platform.py
Normal file
|
@ -0,0 +1,29 @@
|
|||
import platform
|
||||
import simplejson as json
|
||||
|
||||
from urllib2 import urlopen
|
||||
|
||||
from lbrynet import __version__ as lbrynet_version
|
||||
from lbrynet import build_type
|
||||
from lbryum.version import LBRYUM_VERSION as lbryum_version
|
||||
|
||||
|
||||
def get_platform():
|
||||
p = {
|
||||
"processor": platform.processor(),
|
||||
"python_version": platform.python_version(),
|
||||
"platform": platform.platform(),
|
||||
"os_release": platform.release(),
|
||||
"os_system": platform.system(),
|
||||
"lbrynet_version": lbrynet_version,
|
||||
"lbryum_version": lbryum_version,
|
||||
"ui_version": "not loaded yet",
|
||||
"build": build_type.BUILD, # travis sets this during build step
|
||||
}
|
||||
|
||||
try:
|
||||
p['ip'] = json.load(urlopen('http://jsonip.com'))['ip']
|
||||
except:
|
||||
p['ip'] = "Could not determine IP"
|
||||
|
||||
return p
|
|
@ -171,13 +171,14 @@ def convert_verbose(verbose):
|
|||
should be at the info level.
|
||||
if --verbose is provided, but not followed by any arguments, then
|
||||
args.verbose = [] and debug logging should be enabled for all of lbrynet
|
||||
along with info logging on lbryum.
|
||||
if --verbose is provided and followed by arguments, those arguments
|
||||
will be in a list
|
||||
"""
|
||||
if verbose is None:
|
||||
return []
|
||||
if verbose == []:
|
||||
return ['lbrynet']
|
||||
return ['lbrynet', 'lbryum']
|
||||
return verbose
|
||||
|
||||
|
||||
|
@ -204,6 +205,13 @@ def configure_logging(file_name, console, verbose=None):
|
|||
# allow info.
|
||||
level = 'DEBUG' if verbose else 'INFO'
|
||||
handler = configure_console(level=level)
|
||||
if 'lbryum' in verbose:
|
||||
# TODO: this enables lbryum logging on the other handlers
|
||||
# too which isn't consistent with how verbose logging
|
||||
# happens with other loggers. Should change the configuration
|
||||
# so that its only logging at the INFO level for the console.
|
||||
logging.getLogger('lbryum').setLevel(logging.INFO)
|
||||
verbose.remove('lbryum')
|
||||
if verbose:
|
||||
handler.addFilter(LoggerNameFilter(verbose))
|
||||
|
||||
|
|
|
@ -2,7 +2,6 @@ import binascii
|
|||
import logging.handlers
|
||||
import mimetypes
|
||||
import os
|
||||
import platform
|
||||
import random
|
||||
import re
|
||||
import subprocess
|
||||
|
@ -14,6 +13,7 @@ from urllib2 import urlopen
|
|||
from appdirs import user_data_dir
|
||||
from datetime import datetime
|
||||
from decimal import Decimal
|
||||
|
||||
from twisted.web import server
|
||||
from twisted.internet import defer, threads, error, reactor, task
|
||||
from twisted.internet.task import LoopingCall
|
||||
|
@ -24,6 +24,7 @@ from jsonschema import ValidationError
|
|||
from lbryum.version import LBRYUM_VERSION as lbryum_version
|
||||
from lbrynet import __version__ as lbrynet_version
|
||||
from lbrynet import conf, reflector, analytics
|
||||
from lbrynet.conf import LBRYCRD_WALLET, LBRYUM_WALLET, PTC_WALLET
|
||||
from lbrynet.metadata.Fee import FeeValidator
|
||||
from lbrynet.metadata.Metadata import Metadata, verify_name_characters
|
||||
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaverFactory, EncryptedFileOpenerFactory
|
||||
|
@ -38,8 +39,7 @@ from lbrynet.lbrynet_daemon.Downloader import GetStream
|
|||
from lbrynet.lbrynet_daemon.Publisher import Publisher
|
||||
from lbrynet.lbrynet_daemon.ExchangeRateManager import ExchangeRateManager
|
||||
from lbrynet.lbrynet_daemon.auth.server import AuthJSONRPCServer
|
||||
from lbrynet.core import log_support
|
||||
from lbrynet.core import utils
|
||||
from lbrynet.core import log_support, utils, Platform
|
||||
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob, BlobStreamDescriptorReader
|
||||
from lbrynet.core.Session import Session
|
||||
from lbrynet.core.PTCWallet import PTCWallet
|
||||
|
@ -199,7 +199,7 @@ class Daemon(AuthJSONRPCServer):
|
|||
LBRYnet daemon, a jsonrpc interface to lbry functions
|
||||
"""
|
||||
|
||||
def __init__(self, root):
|
||||
def __init__(self, root, analytics_manager):
|
||||
AuthJSONRPCServer.__init__(self, conf.settings.use_auth_http)
|
||||
reactor.addSystemEventTrigger('before', 'shutdown', self._shutdown)
|
||||
|
||||
|
@ -245,13 +245,12 @@ class Daemon(AuthJSONRPCServer):
|
|||
|
||||
self.startup_status = STARTUP_STAGES[0]
|
||||
self.startup_message = None
|
||||
self.announced_startup = False
|
||||
self.connected_to_internet = True
|
||||
self.connection_problem = None
|
||||
self.git_lbrynet_version = None
|
||||
self.git_lbryum_version = None
|
||||
self.ui_version = None
|
||||
self.ip = None
|
||||
self.platform = None
|
||||
self.first_run = None
|
||||
self.log_file = conf.settings.get_log_filename()
|
||||
self.current_db_revision = 1
|
||||
|
@ -263,7 +262,7 @@ class Daemon(AuthJSONRPCServer):
|
|||
# of the daemon, but I don't want to deal with that now
|
||||
self.log_uploader = log_support.LogUploader.load('lbrynet', self.log_file)
|
||||
|
||||
self.analytics_manager = None
|
||||
self.analytics_manager = analytics_manager
|
||||
self.lbryid = PENDING_LBRY_ID
|
||||
self.daemon_conf = conf.settings.get_conf_filename()
|
||||
|
||||
|
@ -297,13 +296,14 @@ class Daemon(AuthJSONRPCServer):
|
|||
content = request.content.read()
|
||||
parsed = jsonrpclib.loads(content)
|
||||
function_path = parsed.get("method")
|
||||
if self.wallet_type == "lbryum" and function_path in ['set_miner', 'get_miner_status']:
|
||||
if self.wallet_type == LBRYUM_WALLET and function_path in ['set_miner', 'get_miner_status']:
|
||||
log.warning("Mining commands are not available in lbryum")
|
||||
raise Exception("Command not available in lbryum")
|
||||
return True
|
||||
|
||||
def set_wallet_attributes(self):
|
||||
self.wallet_dir = None
|
||||
if self.wallet_type != "lbrycrd":
|
||||
if self.wallet_type != LBRYCRD_WALLET:
|
||||
return
|
||||
if os.name == "nt":
|
||||
from lbrynet.winhelpers.knownpaths import get_path, FOLDERID, UserHandle
|
||||
|
@ -387,24 +387,10 @@ class Daemon(AuthJSONRPCServer):
|
|||
return d
|
||||
|
||||
def _get_platform(self):
|
||||
r = {
|
||||
"processor": platform.processor(),
|
||||
"python_version": platform.python_version(),
|
||||
"platform": platform.platform(),
|
||||
"os_release": platform.release(),
|
||||
"os_system": platform.system(),
|
||||
"lbrynet_version": lbrynet_version,
|
||||
"lbryum_version": lbryum_version,
|
||||
"ui_version": self.lbry_ui_manager.loaded_git_version,
|
||||
}
|
||||
if not self.ip:
|
||||
try:
|
||||
r['ip'] = json.load(urlopen('http://jsonip.com'))['ip']
|
||||
self.ip = r['ip']
|
||||
except:
|
||||
r['ip'] = "Could not determine"
|
||||
|
||||
return r
|
||||
if self.platform is None:
|
||||
self.platform = Platform.get_platform()
|
||||
self.platform["ui_version"] = self.lbry_ui_manager.loaded_git_version
|
||||
return self.platform
|
||||
|
||||
def _initial_setup(self):
|
||||
def _log_platform():
|
||||
|
@ -532,6 +518,11 @@ class Daemon(AuthJSONRPCServer):
|
|||
return defer.succeed(True)
|
||||
return defer.succeed(True)
|
||||
|
||||
def _stop_file_manager(self):
|
||||
if self.lbry_file_manager:
|
||||
self.lbry_file_manager.stop()
|
||||
return defer.succeed(True)
|
||||
|
||||
def _stop_server(self):
|
||||
try:
|
||||
if self.lbry_server_port is not None:
|
||||
|
@ -598,7 +589,10 @@ class Daemon(AuthJSONRPCServer):
|
|||
id_hash = base58.b58encode(self.lbryid)[:20]
|
||||
else:
|
||||
id_hash = self.lbryid
|
||||
self.log_uploader.upload(exclude_previous, self.lbryid, log_type)
|
||||
try:
|
||||
self.log_uploader.upload(exclude_previous, self.lbryid, log_type)
|
||||
except requests.RequestException:
|
||||
log.exception('Failed to upload log file')
|
||||
return defer.succeed(None)
|
||||
|
||||
def _clean_up_temp_files(self):
|
||||
|
@ -626,11 +620,12 @@ class Daemon(AuthJSONRPCServer):
|
|||
except Exception:
|
||||
log.warn('Failed to upload log', exc_info=True)
|
||||
d = defer.succeed(None)
|
||||
|
||||
d.addCallback(lambda _: self._stop_server())
|
||||
d.addErrback(log_support.failure, log, 'Failure while shutting down: %s')
|
||||
d.addCallback(lambda _: self._stop_reflector())
|
||||
d.addErrback(log_support.failure, log, 'Failure while shutting down: %s')
|
||||
d.addCallback(lambda _: self.lbry_file_manager.stop())
|
||||
d.addCallback(lambda _: self._stop_file_manager())
|
||||
d.addErrback(log_support.failure, log, 'Failure while shutting down: %s')
|
||||
if self.session is not None:
|
||||
d.addCallback(lambda _: self.session.shut_down())
|
||||
|
@ -756,18 +751,23 @@ class Daemon(AuthJSONRPCServer):
|
|||
return d
|
||||
|
||||
def _get_analytics(self):
|
||||
analytics_api = analytics.Api.load()
|
||||
context = analytics.make_context(self._get_platform(), self.wallet_type)
|
||||
events_generator = analytics.Events(
|
||||
context, base58.b58encode(self.lbryid), self._session_id)
|
||||
self.analytics_manager = analytics.Manager(
|
||||
analytics_api, events_generator, analytics.Track())
|
||||
self.analytics_manager.start()
|
||||
self.analytics_manager.register_repeating_metric(
|
||||
analytics.BLOB_BYTES_AVAILABLE,
|
||||
AlwaysSend(calculate_available_blob_size, self.session.blob_manager),
|
||||
frequency=300
|
||||
)
|
||||
if self.analytics_manager is None:
|
||||
self.analytics_manager = analytics.Manager.new_instance(
|
||||
events=events_generator
|
||||
)
|
||||
else:
|
||||
self.analytics_manager.update_events_generator(events_generator)
|
||||
|
||||
if not self.analytics_manager.is_started:
|
||||
self.analytics_manager.start()
|
||||
self.analytics_manager.register_repeating_metric(
|
||||
analytics.BLOB_BYTES_AVAILABLE,
|
||||
AlwaysSend(calculate_available_blob_size, self.session.blob_manager),
|
||||
frequency=300
|
||||
)
|
||||
|
||||
def _get_session(self):
|
||||
def get_default_data_rate():
|
||||
|
@ -777,25 +777,25 @@ class Daemon(AuthJSONRPCServer):
|
|||
return d
|
||||
|
||||
def get_wallet():
|
||||
if self.wallet_type == "lbrycrd":
|
||||
if self.wallet_type == LBRYCRD_WALLET:
|
||||
log.info("Using lbrycrd wallet")
|
||||
wallet = LBRYcrdWallet(self.db_dir,
|
||||
wallet_dir=self.wallet_dir,
|
||||
wallet_conf=self.lbrycrd_conf,
|
||||
lbrycrdd_path=self.lbrycrdd_path)
|
||||
d = defer.succeed(wallet)
|
||||
elif self.wallet_type == "lbryum":
|
||||
elif self.wallet_type == LBRYUM_WALLET:
|
||||
log.info("Using lbryum wallet")
|
||||
config = {'auto-connect': True}
|
||||
config = {'auto_connect': True}
|
||||
if conf.settings.lbryum_wallet_dir:
|
||||
config['lbryum_path'] = conf.settings.lbryum_wallet_dir
|
||||
d = defer.succeed(LBRYumWallet(self.db_dir, config))
|
||||
elif self.wallet_type == "ptc":
|
||||
elif self.wallet_type == PTC_WALLET:
|
||||
log.info("Using PTC wallet")
|
||||
d = defer.succeed(PTCWallet(self.db_dir))
|
||||
else:
|
||||
raise ValueError('Wallet Type {} is not valid'.format(self.wallet_type))
|
||||
d.addCallback(lambda wallet: {"wallet": wallet})
|
||||
d.addCallback(lambda w: {"wallet": w})
|
||||
return d
|
||||
|
||||
d1 = get_default_data_rate()
|
||||
|
@ -840,11 +840,14 @@ class Daemon(AuthJSONRPCServer):
|
|||
|
||||
def eb():
|
||||
if not r.called:
|
||||
self.analytics_manager.send_error("sd blob download timed out", sd_hash)
|
||||
r.errback(Exception("sd timeout"))
|
||||
|
||||
r = defer.Deferred(None)
|
||||
reactor.callLater(timeout, eb)
|
||||
d = download_sd_blob(self.session, sd_hash, self.session.payment_rate_manager)
|
||||
d.addErrback(lambda err: self.analytics_manager.send_error(
|
||||
"error downloading sd blob: " + err, sd_hash))
|
||||
d.addCallback(BlobStreamDescriptorReader)
|
||||
d.addCallback(lambda blob: blob.get_info())
|
||||
d.addCallback(cb)
|
||||
|
@ -998,50 +1001,26 @@ class Daemon(AuthJSONRPCServer):
|
|||
def _reflect(self, lbry_file):
|
||||
if not lbry_file:
|
||||
return defer.fail(Exception("no lbry file given to reflect"))
|
||||
|
||||
stream_hash = lbry_file.stream_hash
|
||||
|
||||
if stream_hash is None:
|
||||
return defer.fail(Exception("no stream hash"))
|
||||
|
||||
log.info("Reflecting stream: %s" % stream_hash)
|
||||
|
||||
reflector_server = random.choice(conf.settings.reflector_servers)
|
||||
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
|
||||
log.info("Start reflector client")
|
||||
factory = reflector.ClientFactory(
|
||||
self.session.blob_manager,
|
||||
self.lbry_file_manager.stream_info_manager,
|
||||
stream_hash
|
||||
)
|
||||
d = reactor.resolve(reflector_address)
|
||||
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
||||
d.addCallback(lambda _: factory.finished_deferred)
|
||||
return d
|
||||
return run_reflector_factory(factory)
|
||||
|
||||
def _reflect_blobs(self, blob_hashes):
|
||||
if not blob_hashes:
|
||||
return defer.fail(Exception("no lbry file given to reflect"))
|
||||
|
||||
log.info("Reflecting %i blobs" % len(blob_hashes))
|
||||
|
||||
reflector_server = random.choice(conf.settings.reflector_servers)
|
||||
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
|
||||
log.info("Start reflector client")
|
||||
factory = reflector.BlobClientFactory(
|
||||
self.session.blob_manager,
|
||||
blob_hashes
|
||||
)
|
||||
d = reactor.resolve(reflector_address)
|
||||
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
||||
d.addCallback(lambda _: factory.finished_deferred)
|
||||
return d
|
||||
|
||||
def _log_to_slack(self, msg):
|
||||
URL = "https://hooks.slack.com/services/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"
|
||||
msg = platform.platform() + ": " + base58.b58encode(self.lbryid)[:20] + ", " + msg
|
||||
requests.post(URL, json.dumps({"text": msg}))
|
||||
return defer.succeed(None)
|
||||
return run_reflector_factory(factory)
|
||||
|
||||
def _run_scripts(self):
|
||||
if len([k for k in self.startup_scripts if 'run_once' in k.keys()]):
|
||||
|
@ -1101,7 +1080,7 @@ class Daemon(AuthJSONRPCServer):
|
|||
r['message'] = self.connection_problem[1]
|
||||
r['is_lagging'] = True
|
||||
elif self.startup_status[0] == LOADING_wallet_CODE:
|
||||
if self.wallet_type == 'lbryum':
|
||||
if self.wallet_type == LBRYUM_WALLET:
|
||||
if self.session.wallet.blocks_behind_alert != 0:
|
||||
r['message'] = r['message'] % (str(self.session.wallet.blocks_behind_alert) + " blocks behind")
|
||||
r['progress'] = self.session.wallet.catchup_progress
|
||||
|
@ -1331,8 +1310,7 @@ class Daemon(AuthJSONRPCServer):
|
|||
"""
|
||||
|
||||
d = self._get_lbry_files()
|
||||
d.addCallback(lambda r: [d[1] for d in r])
|
||||
d.addCallback(lambda r: self._render_response(r, OK_CODE) if len(r) else self._render_response(False, OK_CODE))
|
||||
d.addCallback(lambda r: self._render_response([d[1] for d in r], OK_CODE))
|
||||
|
||||
return d
|
||||
|
||||
|
@ -2079,8 +2057,6 @@ class Daemon(AuthJSONRPCServer):
|
|||
exclude_previous = True
|
||||
|
||||
d = self._upload_log(log_type=log_type, exclude_previous=exclude_previous, force=force)
|
||||
if 'message' in p.keys():
|
||||
d.addCallback(lambda _: self._log_to_slack(p['message']))
|
||||
d.addCallback(lambda _: self._render_response(True, OK_CODE))
|
||||
return d
|
||||
|
||||
|
@ -2626,3 +2602,13 @@ def handle_failure(err, msg):
|
|||
#
|
||||
# If so, maybe we should return something else.
|
||||
return server.failure
|
||||
|
||||
|
||||
def run_reflector_factory(factory):
|
||||
reflector_server = random.choice(conf.settings.reflector_servers)
|
||||
reflector_address, reflector_port = reflector_server
|
||||
log.info("Start reflector client")
|
||||
d = reactor.resolve(reflector_address)
|
||||
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
||||
d.addCallback(lambda _: factory.finished_deferred)
|
||||
return d
|
||||
|
|
|
@ -9,6 +9,7 @@ from twisted.internet import defer, reactor, error
|
|||
from twisted.cred import portal
|
||||
from jsonrpc.proxy import JSONRPCProxy
|
||||
|
||||
from lbrynet import analytics
|
||||
from lbrynet.lbrynet_daemon.auth.auth import PasswordChecker, HttpPasswordRealm
|
||||
from lbrynet.lbrynet_daemon.auth.util import initialize_api_key_file
|
||||
from lbrynet import conf
|
||||
|
@ -46,7 +47,7 @@ def start():
|
|||
parser.add_argument("--wallet",
|
||||
help="lbrycrd or lbryum, default lbryum",
|
||||
type=str,
|
||||
default='lbryum')
|
||||
default=conf.LBRYUM_WALLET)
|
||||
parser.add_argument("--ui", help="path to custom UI folder", default=None)
|
||||
parser.add_argument(
|
||||
"--branch",
|
||||
|
@ -98,7 +99,9 @@ def start():
|
|||
print "To quit press ctrl-c or call 'stop' via the API"
|
||||
|
||||
if test_internet_connection():
|
||||
start_server_and_listen(args.launchui, args.useauth)
|
||||
analytics_manager = analytics.Manager.new_instance()
|
||||
analytics_manager.send_server_startup()
|
||||
start_server_and_listen(args.launchui, args.useauth, analytics_manager)
|
||||
reactor.run()
|
||||
|
||||
if not args.logtoconsole and not args.quiet:
|
||||
|
@ -121,29 +124,32 @@ def update_settings_from_args(args):
|
|||
settings.update(to_pass)
|
||||
|
||||
|
||||
def log_and_kill(failure):
|
||||
def log_and_kill(failure, analytics_manager):
|
||||
analytics_manager.send_server_startup_error(failure.getErrorMessage() + " " + str(failure))
|
||||
log_support.failure(failure, log, 'Failed to startup: %s')
|
||||
reactor.callFromThread(reactor.stop)
|
||||
|
||||
|
||||
def start_server_and_listen(launchui, use_auth):
|
||||
def start_server_and_listen(launchui, use_auth, analytics_manager):
|
||||
"""The primary entry point for launching the daemon.
|
||||
|
||||
Args:
|
||||
launchui: set to true to open a browser window
|
||||
use_auth: set to true to enable http authentication
|
||||
analytics_manager: to send analytics
|
||||
kwargs: passed along to `DaemonServer().start()`
|
||||
"""
|
||||
lbry = DaemonServer()
|
||||
d = lbry.start()
|
||||
d.addCallback(lambda _: listen(lbry, use_auth))
|
||||
daemon_server = DaemonServer(analytics_manager)
|
||||
d = daemon_server.start()
|
||||
d.addCallback(lambda _: listen(daemon_server, use_auth))
|
||||
if launchui:
|
||||
d.addCallback(lambda _: webbrowser.open(settings.UI_ADDRESS))
|
||||
d.addErrback(log_and_kill)
|
||||
d.addCallback(lambda _: analytics_manager.send_server_startup_success())
|
||||
d.addErrback(log_and_kill, analytics_manager)
|
||||
|
||||
|
||||
def listen(lbry, use_auth):
|
||||
site_base = get_site_base(use_auth, lbry.root)
|
||||
def listen(daemon_server, use_auth):
|
||||
site_base = get_site_base(use_auth, daemon_server.root)
|
||||
lbrynet_server = server.Site(site_base)
|
||||
lbrynet_server.requestFactory = DaemonRequest
|
||||
try:
|
||||
|
|
|
@ -3,7 +3,6 @@ import os
|
|||
|
||||
from twisted.internet import defer
|
||||
|
||||
from lbrynet.conf import settings
|
||||
from lbrynet.lbrynet_daemon.Daemon import Daemon
|
||||
from lbrynet.lbrynet_daemon.Resources import LBRYindex, HostedEncryptedFile, EncryptedFileUpload
|
||||
from lbrynet.conf import settings
|
||||
|
@ -13,10 +12,14 @@ log = logging.getLogger(__name__)
|
|||
|
||||
|
||||
class DaemonServer(object):
|
||||
def __init__(self, analytics_manager=None):
|
||||
self.root = None
|
||||
self.analytics_manager = analytics_manager
|
||||
|
||||
def _setup_server(self):
|
||||
ui_path = os.path.join(settings.ensure_data_dir(), "lbry-ui", "active")
|
||||
self.root = LBRYindex(ui_path)
|
||||
self._api = Daemon(self.root)
|
||||
self._api = Daemon(self.root, self.analytics_manager)
|
||||
self.root.putChild("view", HostedEncryptedFile(self._api))
|
||||
self.root.putChild("upload", EncryptedFileUpload(self._api))
|
||||
self.root.putChild(settings.API_ADDRESS, self._api)
|
||||
|
|
|
@ -61,9 +61,11 @@ class UIManager(object):
|
|||
self.loaded_requirements = None
|
||||
|
||||
def setup(self, branch=None, check_requirements=None, user_specified=None):
|
||||
local_ui_path = settings.local_ui_path or user_specified
|
||||
self.branch = settings.ui_branch or branch
|
||||
self.check_requirements = settings.check_ui_requirements or check_requirements
|
||||
local_ui_path = user_specified or settings.local_ui_path
|
||||
|
||||
self.branch = branch or settings.ui_branch
|
||||
self.check_requirements = (check_requirements if check_requirements is not None
|
||||
else settings.check_ui_requirements)
|
||||
|
||||
if self._check_for_bundled_ui():
|
||||
return defer.succeed(True)
|
||||
|
@ -92,8 +94,14 @@ class UIManager(object):
|
|||
return d
|
||||
|
||||
def _check_for_bundled_ui(self):
|
||||
bundle_manager = BundledUIManager(self.root, self.active_dir, get_bundled_ui_path())
|
||||
return bundle_manager.setup()
|
||||
try:
|
||||
bundled_path = get_bundled_ui_path()
|
||||
except Exception:
|
||||
log.warning('Failed to get path for bundled UI', exc_info=True)
|
||||
return False
|
||||
else:
|
||||
bundle_manager = BundledUIManager(self.root, self.active_dir, bundled_path)
|
||||
return bundle_manager.setup()
|
||||
|
||||
def _up_to_date(self):
|
||||
def _get_git_info():
|
||||
|
|
|
@ -7,8 +7,8 @@ from twisted.python.failure import Failure
|
|||
|
||||
from txjsonrpc import jsonrpclib
|
||||
from lbrynet.core.Error import InvalidAuthenticationToken, InvalidHeaderError, SubhandlerError
|
||||
from lbrynet.core import log_support
|
||||
from lbrynet.conf import settings
|
||||
from lbrynet.core import log_support
|
||||
from lbrynet.lbrynet_daemon.auth.util import APIKey, get_auth_message
|
||||
from lbrynet.lbrynet_daemon.auth.client import LBRY_SECRET
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ NAME_ALLOWED_CHARSET = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0987
|
|||
|
||||
|
||||
def verify_name_characters(name):
|
||||
assert len(name) > 0, "Empty uri"
|
||||
for c in name:
|
||||
assert c in NAME_ALLOWED_CHARSET, "Invalid character"
|
||||
return True
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from lbrynet.reflector.server.server import ReflectorServerFactory as ServerFactory
|
||||
from lbrynet.reflector.client.client import EncryptedFileReflectorClientFactory as ClientFactory
|
||||
from lbrynet.reflector.client.client import BlobReflectorClientFactory as BlobClientFactory
|
||||
from lbrynet.reflector import reupload
|
||||
from lbrynet.reflector.client.blob import BlobReflectorClientFactory as BlobClientFactory
|
||||
from lbrynet.reflector import reupload
|
||||
|
|
210
lbrynet/reflector/client/blob.py
Normal file
210
lbrynet/reflector/client/blob.py
Normal file
|
@ -0,0 +1,210 @@
|
|||
import json
|
||||
import logging
|
||||
|
||||
from twisted.protocols.basic import FileSender
|
||||
from twisted.internet.protocol import Protocol, ClientFactory
|
||||
from twisted.internet import defer, error
|
||||
|
||||
from lbrynet.core import log_support
|
||||
from lbrynet.reflector.common import IncompleteResponse
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BlobReflectorClient(Protocol):
|
||||
# Protocol stuff
|
||||
|
||||
def connectionMade(self):
|
||||
self.blob_manager = self.factory.blob_manager
|
||||
self.response_buff = ''
|
||||
self.outgoing_buff = ''
|
||||
self.blob_hashes_to_send = self.factory.blobs
|
||||
self.next_blob_to_send = None
|
||||
self.blob_read_handle = None
|
||||
self.received_handshake_response = False
|
||||
self.protocol_version = None
|
||||
self.file_sender = None
|
||||
self.producer = None
|
||||
self.streaming = False
|
||||
self.sent_blobs = False
|
||||
d = self.send_handshake()
|
||||
d.addErrback(
|
||||
lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
|
||||
|
||||
def dataReceived(self, data):
|
||||
log.debug('Recieved %s', data)
|
||||
self.response_buff += data
|
||||
try:
|
||||
msg = self.parse_response(self.response_buff)
|
||||
except IncompleteResponse:
|
||||
pass
|
||||
else:
|
||||
self.response_buff = ''
|
||||
d = self.handle_response(msg)
|
||||
d.addCallback(lambda _: self.send_next_request())
|
||||
d.addErrback(self.response_failure_handler)
|
||||
|
||||
def connectionLost(self, reason):
|
||||
if reason.check(error.ConnectionDone):
|
||||
self.factory.sent_blobs = self.sent_blobs
|
||||
if self.factory.sent_blobs:
|
||||
log.info('Finished sending data via reflector')
|
||||
self.factory.finished_deferred.callback(True)
|
||||
else:
|
||||
log.info('Reflector finished: %s', reason)
|
||||
self.factory.finished_deferred.callback(reason)
|
||||
|
||||
# IConsumer stuff
|
||||
|
||||
def registerProducer(self, producer, streaming):
|
||||
self.producer = producer
|
||||
self.streaming = streaming
|
||||
if self.streaming is False:
|
||||
from twisted.internet import reactor
|
||||
reactor.callLater(0, self.producer.resumeProducing)
|
||||
|
||||
def unregisterProducer(self):
|
||||
self.producer = None
|
||||
|
||||
def write(self, data):
|
||||
self.transport.write(data)
|
||||
if self.producer is not None and self.streaming is False:
|
||||
from twisted.internet import reactor
|
||||
reactor.callLater(0, self.producer.resumeProducing)
|
||||
|
||||
def send_handshake(self):
|
||||
log.debug('Sending handshake')
|
||||
self.write(json.dumps({'version': 0}))
|
||||
return defer.succeed(None)
|
||||
|
||||
def parse_response(self, buff):
|
||||
try:
|
||||
return json.loads(buff)
|
||||
except ValueError:
|
||||
raise IncompleteResponse()
|
||||
|
||||
def response_failure_handler(self, err):
|
||||
log.warning("An error occurred handling the response: %s", err.getTraceback())
|
||||
|
||||
def handle_response(self, response_dict):
|
||||
if self.received_handshake_response is False:
|
||||
return self.handle_handshake_response(response_dict)
|
||||
else:
|
||||
return self.handle_normal_response(response_dict)
|
||||
|
||||
def set_not_uploading(self):
|
||||
if self.next_blob_to_send is not None:
|
||||
self.next_blob_to_send.close_read_handle(self.read_handle)
|
||||
self.read_handle = None
|
||||
self.next_blob_to_send = None
|
||||
self.file_sender = None
|
||||
return defer.succeed(None)
|
||||
|
||||
def start_transfer(self):
|
||||
self.sent_blobs = True
|
||||
self.write(json.dumps({}))
|
||||
assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer"
|
||||
d = self.file_sender.beginFileTransfer(self.read_handle, self)
|
||||
return d
|
||||
|
||||
def handle_handshake_response(self, response_dict):
|
||||
if 'version' not in response_dict:
|
||||
raise ValueError("Need protocol version number!")
|
||||
self.protocol_version = int(response_dict['version'])
|
||||
if self.protocol_version != 0:
|
||||
raise ValueError("I can't handle protocol version {}!".format(self.protocol_version))
|
||||
self.received_handshake_response = True
|
||||
return defer.succeed(True)
|
||||
|
||||
def handle_normal_response(self, response_dict):
|
||||
if self.file_sender is None: # Expecting Server Info Response
|
||||
if 'send_blob' not in response_dict:
|
||||
raise ValueError("I don't know whether to send the blob or not!")
|
||||
if response_dict['send_blob'] is True:
|
||||
self.file_sender = FileSender()
|
||||
return defer.succeed(True)
|
||||
else:
|
||||
return self.set_not_uploading()
|
||||
else: # Expecting Server Blob Response
|
||||
if 'received_blob' not in response_dict:
|
||||
raise ValueError("I don't know if the blob made it to the intended destination!")
|
||||
else:
|
||||
return self.set_not_uploading()
|
||||
|
||||
def open_blob_for_reading(self, blob):
|
||||
if blob.is_validated():
|
||||
read_handle = blob.open_for_reading()
|
||||
if read_handle is not None:
|
||||
log.debug('Getting ready to send %s', blob.blob_hash)
|
||||
self.next_blob_to_send = blob
|
||||
self.read_handle = read_handle
|
||||
return None
|
||||
raise ValueError(
|
||||
"Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash))
|
||||
|
||||
def send_blob_info(self):
|
||||
log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash)
|
||||
assert self.next_blob_to_send is not None, "need to have a next blob to send at this point"
|
||||
log.debug('sending blob info')
|
||||
self.write(json.dumps({
|
||||
'blob_hash': self.next_blob_to_send.blob_hash,
|
||||
'blob_size': self.next_blob_to_send.length
|
||||
}))
|
||||
|
||||
def log_fail_and_disconnect(self, err, blob_hash):
|
||||
log_support.failure(err, log, "Error reflecting blob %s: %s", blob_hash)
|
||||
self.transport.loseConnection()
|
||||
|
||||
def send_next_request(self):
|
||||
if self.file_sender is not None:
|
||||
# send the blob
|
||||
log.debug('Sending the blob')
|
||||
return self.start_transfer()
|
||||
elif self.blob_hashes_to_send:
|
||||
# open the next blob to send
|
||||
blob_hash = self.blob_hashes_to_send[0]
|
||||
log.debug('No current blob, sending the next one: %s', blob_hash)
|
||||
self.blob_hashes_to_send = self.blob_hashes_to_send[1:]
|
||||
d = self.blob_manager.get_blob(blob_hash, True)
|
||||
d.addCallback(self.open_blob_for_reading)
|
||||
# send the server the next blob hash + length
|
||||
d.addCallbacks(
|
||||
lambda _: self.send_blob_info(),
|
||||
lambda err: self.log_fail_and_disconnect(err, blob_hash))
|
||||
return d
|
||||
else:
|
||||
# close connection
|
||||
log.debug('No more blob hashes, closing connection')
|
||||
self.transport.loseConnection()
|
||||
|
||||
|
||||
class BlobReflectorClientFactory(ClientFactory):
|
||||
protocol = BlobReflectorClient
|
||||
|
||||
def __init__(self, blob_manager, blobs):
|
||||
self.blob_manager = blob_manager
|
||||
self.blobs = blobs
|
||||
self.p = None
|
||||
self.sent_blobs = False
|
||||
self.finished_deferred = defer.Deferred()
|
||||
|
||||
def buildProtocol(self, addr):
|
||||
p = self.protocol()
|
||||
p.factory = self
|
||||
self.p = p
|
||||
return p
|
||||
|
||||
def startFactory(self):
|
||||
log.debug('Starting reflector factory')
|
||||
ClientFactory.startFactory(self)
|
||||
|
||||
def startedConnecting(self, connector):
|
||||
log.debug('Started connecting')
|
||||
|
||||
def clientConnectionLost(self, connector, reason):
|
||||
"""If we get disconnected, reconnect to server."""
|
||||
log.debug("connection lost: %s", reason.getErrorMessage())
|
||||
|
||||
def clientConnectionFailed(self, connector, reason):
|
||||
log.debug("connection failed: %s", reason.getErrorMessage())
|
|
@ -49,22 +49,19 @@ Client may now send another Client Info Request
|
|||
"""
|
||||
import json
|
||||
import logging
|
||||
|
||||
from twisted.protocols.basic import FileSender
|
||||
from twisted.internet.protocol import Protocol, ClientFactory
|
||||
from twisted.internet import defer, error
|
||||
|
||||
from lbrynet.reflector.common import IncompleteResponse
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class IncompleteResponseError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class EncryptedFileReflectorClient(Protocol):
|
||||
|
||||
# Protocol stuff
|
||||
|
||||
def connectionMade(self):
|
||||
self.blob_manager = self.factory.blob_manager
|
||||
self.response_buff = ''
|
||||
|
@ -79,14 +76,15 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
self.streaming = False
|
||||
d = self.get_blobs_to_send(self.factory.stream_info_manager, self.factory.stream_hash)
|
||||
d.addCallback(lambda _: self.send_handshake())
|
||||
d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
|
||||
d.addErrback(
|
||||
lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
|
||||
|
||||
def dataReceived(self, data):
|
||||
log.debug('Recieved %s', data)
|
||||
self.response_buff += data
|
||||
try:
|
||||
msg = self.parse_response(self.response_buff)
|
||||
except IncompleteResponseError:
|
||||
except IncompleteResponse:
|
||||
pass
|
||||
else:
|
||||
self.response_buff = ''
|
||||
|
@ -149,7 +147,7 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
try:
|
||||
return json.loads(buff)
|
||||
except ValueError:
|
||||
raise IncompleteResponseError()
|
||||
raise IncompleteResponse()
|
||||
|
||||
def response_failure_handler(self, err):
|
||||
log.warning("An error occurred handling the response: %s", err.getTraceback())
|
||||
|
@ -206,7 +204,8 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
self.next_blob_to_send = blob
|
||||
self.read_handle = read_handle
|
||||
return None
|
||||
raise ValueError("Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash))
|
||||
raise ValueError(
|
||||
"Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash))
|
||||
|
||||
def send_blob_info(self):
|
||||
log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash)
|
||||
|
@ -254,200 +253,6 @@ class EncryptedFileReflectorClientFactory(ClientFactory):
|
|||
self.p = p
|
||||
return p
|
||||
|
||||
def startFactory(self):
|
||||
log.debug('Starting reflector factory')
|
||||
ClientFactory.startFactory(self)
|
||||
|
||||
def startedConnecting(self, connector):
|
||||
log.debug('Started connecting')
|
||||
|
||||
def clientConnectionLost(self, connector, reason):
|
||||
"""If we get disconnected, reconnect to server."""
|
||||
log.debug("connection lost: %s", reason)
|
||||
|
||||
def clientConnectionFailed(self, connector, reason):
|
||||
log.debug("connection failed: %s", reason)
|
||||
|
||||
|
||||
class BlobReflectorClient(Protocol):
|
||||
# Protocol stuff
|
||||
|
||||
def connectionMade(self):
|
||||
self.blob_manager = self.factory.blob_manager
|
||||
self.response_buff = ''
|
||||
self.outgoing_buff = ''
|
||||
self.blob_hashes_to_send = self.factory.blobs
|
||||
self.next_blob_to_send = None
|
||||
self.blob_read_handle = None
|
||||
self.received_handshake_response = False
|
||||
self.protocol_version = None
|
||||
self.file_sender = None
|
||||
self.producer = None
|
||||
self.streaming = False
|
||||
self.sent_blobs = False
|
||||
d = self.send_handshake()
|
||||
d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
|
||||
|
||||
def dataReceived(self, data):
|
||||
log.debug('Recieved %s', data)
|
||||
self.response_buff += data
|
||||
try:
|
||||
msg = self.parse_response(self.response_buff)
|
||||
except IncompleteResponseError:
|
||||
pass
|
||||
else:
|
||||
self.response_buff = ''
|
||||
d = self.handle_response(msg)
|
||||
d.addCallback(lambda _: self.send_next_request())
|
||||
d.addErrback(self.response_failure_handler)
|
||||
|
||||
def connectionLost(self, reason):
|
||||
if reason.check(error.ConnectionDone):
|
||||
self.factory.sent_blobs = self.sent_blobs
|
||||
if self.factory.sent_blobs:
|
||||
log.info('Finished sending data via reflector')
|
||||
self.factory.finished_deferred.callback(True)
|
||||
else:
|
||||
log.info('Reflector finished: %s', reason)
|
||||
self.factory.finished_deferred.callback(reason)
|
||||
|
||||
# IConsumer stuff
|
||||
|
||||
def registerProducer(self, producer, streaming):
|
||||
self.producer = producer
|
||||
self.streaming = streaming
|
||||
if self.streaming is False:
|
||||
from twisted.internet import reactor
|
||||
reactor.callLater(0, self.producer.resumeProducing)
|
||||
|
||||
def unregisterProducer(self):
|
||||
self.producer = None
|
||||
|
||||
def write(self, data):
|
||||
self.transport.write(data)
|
||||
if self.producer is not None and self.streaming is False:
|
||||
from twisted.internet import reactor
|
||||
reactor.callLater(0, self.producer.resumeProducing)
|
||||
|
||||
def send_handshake(self):
|
||||
log.debug('Sending handshake')
|
||||
self.write(json.dumps({'version': 0}))
|
||||
return defer.succeed(None)
|
||||
|
||||
def parse_response(self, buff):
|
||||
try:
|
||||
return json.loads(buff)
|
||||
except ValueError:
|
||||
raise IncompleteResponseError()
|
||||
|
||||
def response_failure_handler(self, err):
|
||||
log.warning("An error occurred handling the response: %s", err.getTraceback())
|
||||
|
||||
def handle_response(self, response_dict):
|
||||
if self.received_handshake_response is False:
|
||||
return self.handle_handshake_response(response_dict)
|
||||
else:
|
||||
return self.handle_normal_response(response_dict)
|
||||
|
||||
def set_not_uploading(self):
|
||||
if self.next_blob_to_send is not None:
|
||||
self.next_blob_to_send.close_read_handle(self.read_handle)
|
||||
self.read_handle = None
|
||||
self.next_blob_to_send = None
|
||||
self.file_sender = None
|
||||
return defer.succeed(None)
|
||||
|
||||
def start_transfer(self):
|
||||
self.sent_blobs = True
|
||||
self.write(json.dumps({}))
|
||||
assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer"
|
||||
d = self.file_sender.beginFileTransfer(self.read_handle, self)
|
||||
return d
|
||||
|
||||
def handle_handshake_response(self, response_dict):
|
||||
if 'version' not in response_dict:
|
||||
raise ValueError("Need protocol version number!")
|
||||
self.protocol_version = int(response_dict['version'])
|
||||
if self.protocol_version != 0:
|
||||
raise ValueError("I can't handle protocol version {}!".format(self.protocol_version))
|
||||
self.received_handshake_response = True
|
||||
return defer.succeed(True)
|
||||
|
||||
def handle_normal_response(self, response_dict):
|
||||
if self.file_sender is None: # Expecting Server Info Response
|
||||
if 'send_blob' not in response_dict:
|
||||
raise ValueError("I don't know whether to send the blob or not!")
|
||||
if response_dict['send_blob'] is True:
|
||||
self.file_sender = FileSender()
|
||||
return defer.succeed(True)
|
||||
else:
|
||||
return self.set_not_uploading()
|
||||
else: # Expecting Server Blob Response
|
||||
if 'received_blob' not in response_dict:
|
||||
raise ValueError("I don't know if the blob made it to the intended destination!")
|
||||
else:
|
||||
return self.set_not_uploading()
|
||||
|
||||
def open_blob_for_reading(self, blob):
|
||||
if blob.is_validated():
|
||||
read_handle = blob.open_for_reading()
|
||||
if read_handle is not None:
|
||||
log.debug('Getting ready to send %s', blob.blob_hash)
|
||||
self.next_blob_to_send = blob
|
||||
self.read_handle = read_handle
|
||||
return None
|
||||
raise ValueError("Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash))
|
||||
|
||||
def send_blob_info(self):
|
||||
log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash)
|
||||
assert self.next_blob_to_send is not None, "need to have a next blob to send at this point"
|
||||
log.debug('sending blob info')
|
||||
self.write(json.dumps({
|
||||
'blob_hash': self.next_blob_to_send.blob_hash,
|
||||
'blob_size': self.next_blob_to_send.length
|
||||
}))
|
||||
|
||||
def log_fail_and_disconnect(self, err, blob_hash):
|
||||
log.error("Error reflecting blob %s", blob_hash)
|
||||
self.transport.loseConnection()
|
||||
|
||||
def send_next_request(self):
|
||||
if self.file_sender is not None:
|
||||
# send the blob
|
||||
log.debug('Sending the blob')
|
||||
return self.start_transfer()
|
||||
elif self.blob_hashes_to_send:
|
||||
# open the next blob to send
|
||||
blob_hash = self.blob_hashes_to_send[0]
|
||||
log.debug('No current blob, sending the next one: %s', blob_hash)
|
||||
self.blob_hashes_to_send = self.blob_hashes_to_send[1:]
|
||||
d = self.blob_manager.get_blob(blob_hash, True)
|
||||
d.addCallback(self.open_blob_for_reading)
|
||||
# send the server the next blob hash + length
|
||||
d.addCallbacks(lambda _: self.send_blob_info(), lambda err: self.log_fail_and_disconnect(err, blob_hash))
|
||||
return d
|
||||
else:
|
||||
# close connection
|
||||
log.debug('No more blob hashes, closing connection')
|
||||
self.transport.loseConnection()
|
||||
|
||||
|
||||
class BlobReflectorClientFactory(ClientFactory):
|
||||
protocol = BlobReflectorClient
|
||||
|
||||
def __init__(self, blob_manager, blobs):
|
||||
self.blob_manager = blob_manager
|
||||
self.blobs = blobs
|
||||
self.p = None
|
||||
self.sent_blobs = False
|
||||
self.finished_deferred = defer.Deferred()
|
||||
|
||||
def buildProtocol(self, addr):
|
||||
p = self.protocol()
|
||||
p.factory = self
|
||||
self.p = p
|
||||
return p
|
||||
|
||||
def startFactory(self):
|
||||
log.debug('Starting reflector factory')
|
||||
ClientFactory.startFactory(self)
|
||||
|
|
2
lbrynet/reflector/common.py
Normal file
2
lbrynet/reflector/common.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
class IncompleteResponse(Exception):
|
||||
pass
|
|
@ -22,6 +22,7 @@ if not os.path.isfile(lbrycrdd_path_conf):
|
|||
f.close()
|
||||
|
||||
from lbrynet.lbrynet_daemon import DaemonControl
|
||||
from lbrynet import analytics
|
||||
from lbrynet.conf import settings
|
||||
from lbrynet.core import utils
|
||||
|
||||
|
@ -62,7 +63,10 @@ class LBRYDaemonApp(AppKit.NSApplication):
|
|||
notify("LBRY needs an internet connection to start, try again when one is available")
|
||||
sys.exit(0)
|
||||
|
||||
DaemonControl.start_server_and_listen(launchui=True, use_auth=False)
|
||||
DaemonControl.start_server_and_listen(
|
||||
launchui=True, use_auth=False,
|
||||
analytics_manager=analytics.Manager.new_instance()
|
||||
)
|
||||
|
||||
def openui_(self, sender):
|
||||
webbrowser.open(settings.UI_ADDRESS)
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
#!/bin/bash
|
||||
#
|
||||
# Configure the library for a non-production release
|
||||
# Configure build-specific things
|
||||
#
|
||||
|
||||
set -euo pipefail
|
||||
|
@ -13,12 +13,23 @@ add_ui() {
|
|||
wget https://s3.amazonaws.com/lbry-ui/development/data.json -O lbrynet/resources/ui/data.json
|
||||
}
|
||||
|
||||
IS_RC_REGEX="v[[:digit:]]+\.[[:digit:]]+\.[[:digit:]]+-rc[[:digit:]]+"
|
||||
set_build() {
|
||||
local file="lbrynet/build_type.py"
|
||||
# cannot use 'sed -i' because BSD sed and GNU sed are incompatible
|
||||
sed 's/^\(BUILD = "\)[^"]\+\(".*\)$/\1'"${1}"'\2/' "$file" > tmpbuildfile
|
||||
mv -- tmpbuildfile "$file"
|
||||
}
|
||||
|
||||
IS_RC_REGEX="v[[:digit:]]+\.[[:digit:]]+\.[[:digit:]]+rc[[:digit:]]+"
|
||||
|
||||
if [[ -z "$TRAVIS_TAG" ]]; then
|
||||
python packaging/append_sha_to_version.py lbrynet/__init__.py "${TRAVIS_COMMIT}"
|
||||
add_ui
|
||||
set_build "qa"
|
||||
elif [[ "$TRAVIS_TAG" =~ $IS_RC_REGEX ]]; then
|
||||
# If the tag looks like v0.7.6-rc0 then this is a tagged release candidate.
|
||||
# If the tag looks like v0.7.6rc0 then this is a tagged release candidate.
|
||||
add_ui
|
||||
set_build "rc"
|
||||
else
|
||||
set_build "release"
|
||||
fi
|
|
@ -1,5 +1,5 @@
|
|||
[Desktop Entry]
|
||||
Version=0.7.6rc0
|
||||
Version=0.7.6
|
||||
Name=LBRY
|
||||
Comment=The world's first user-owned content marketplace
|
||||
Icon=lbry
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
# this is a port of setup_qa.sh used for the unix platforms
|
||||
# this is a port of setup_build.sh used for the unix platforms
|
||||
|
||||
function AddUi {
|
||||
wget https://s3.amazonaws.com/lbry-ui/development/dist.zip -OutFile dist.zip
|
||||
|
@ -11,14 +11,25 @@ function AddUi {
|
|||
if ($LastExitCode -ne 0) { $host.SetShouldExit($LastExitCode) }
|
||||
}
|
||||
|
||||
function SetBuild([string]$build) {
|
||||
(Get-Content lbrynet\build_type.py).replace('dev', $build) | Set-Content lbrynet\build_type.py
|
||||
if ($LastExitCode -ne 0) { $host.SetShouldExit($LastExitCode) }
|
||||
}
|
||||
|
||||
If (${Env:APPVEYOR_REPO_TAG} -NotMatch "true") {
|
||||
C:\Python27\python.exe packaging\append_sha_to_version.py lbrynet\__init__.py ${Env:APPVEYOR_REPO_COMMIT}
|
||||
if ($LastExitCode -ne 0) { $host.SetShouldExit($LastExitCode) }
|
||||
|
||||
AddUi
|
||||
SetBuild "qa"
|
||||
}
|
||||
ElseIf (${Env:APPVEYOR_REPO_TAG_NAME} -Match "v\d+\.\d+\.\d+-rc\d+") {
|
||||
ElseIf (${Env:APPVEYOR_REPO_TAG_NAME} -Match "v\d+\.\d+\.\d+rc\d+") {
|
||||
# If the tag looks like v0.7.6rc0 then this is a tagged release candidate.
|
||||
AddUi
|
||||
SetBuild "rc"
|
||||
}
|
||||
Else {
|
||||
SetBuild "release"
|
||||
}
|
||||
|
||||
C:\Python27\python.exe setup.py build bdist_msi
|
||||
|
|
|
@ -36,8 +36,9 @@ C:\Python27\Scripts\pip.exe install requests==2.9.1
|
|||
|
||||
C:\Python27\Scripts\pip.exe install zope.interface==4.1.3
|
||||
|
||||
# this includes a patch to allow version numbers with non-integer values
|
||||
C:\Python27\Scripts\pip.exe install https://bitbucket.org/jobevers/cx_freeze/get/tip.tar.gz
|
||||
# this is a patched to allow version numbers with non-integer values
|
||||
# and it is branched off of 4.3.3
|
||||
C:\Python27\Scripts\pip.exe install https://bitbucket.org/jobevers/cx_freeze/get/handle-version.tar.gz
|
||||
|
||||
C:\Python27\Scripts\pip.exe install cython==0.24.1
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ try:
|
|||
except ImportError:
|
||||
import win32gui
|
||||
|
||||
from lbrynet import conf
|
||||
from lbrynet import conf, analytics
|
||||
from lbrynet.core import log_support
|
||||
from lbrynet.core import utils
|
||||
from lbrynet.lbrynet_daemon import DaemonControl
|
||||
|
@ -267,7 +267,10 @@ def main(lbry_name=None):
|
|||
systray_thread.daemon = True
|
||||
systray_thread.start()
|
||||
|
||||
DaemonControl.start_server_and_listen(launchui=True, use_auth=False)
|
||||
DaemonControl.start_server_and_listen(
|
||||
launchui=True, use_auth=False,
|
||||
analytics_manager=analytics.Manager.new_instance()
|
||||
)
|
||||
reactor.run()
|
||||
|
||||
|
||||
|
|
|
@ -2,7 +2,21 @@ from lbrynet.metadata import Metadata
|
|||
from twisted.trial import unittest
|
||||
from jsonschema import ValidationError
|
||||
|
||||
|
||||
class MetadataTest(unittest.TestCase):
|
||||
def test_name_error_if_blank(self):
|
||||
with self.assertRaises(AssertionError):
|
||||
Metadata.verify_name_characters("")
|
||||
|
||||
def test_name_error_if_contains_bad_chrs(self):
|
||||
with self.assertRaises(AssertionError):
|
||||
Metadata.verify_name_characters("wu tang")
|
||||
with self.assertRaises(AssertionError):
|
||||
Metadata.verify_name_characters("$wutang")
|
||||
with self.assertRaises(AssertionError):
|
||||
Metadata.verify_name_characters("#wutang")
|
||||
|
||||
|
||||
def test_validation_error_if_no_metadata(self):
|
||||
metadata = {}
|
||||
with self.assertRaises(ValidationError):
|
||||
|
|
Loading…
Reference in a new issue