forked from LBRYCommunity/lbry-sdk
remove Settings.py
remove old settings class, which was only used for the lbryid and required unqlite. this allows the daemon startup to be simplified, it previously relied on Settings in a few places. lbryid had been stored in the old settings, this initializes it at startup.
This commit is contained in:
parent
a8cb255d24
commit
09846413bd
2 changed files with 22 additions and 205 deletions
|
@ -33,7 +33,6 @@ from lbrynet.lbryfile.EncryptedFileMetadataManager import DBEncryptedFileMetadat
|
||||||
from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager
|
from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager
|
||||||
from lbrynet.lbryfile.StreamDescriptor import EncryptedFileStreamType
|
from lbrynet.lbryfile.StreamDescriptor import EncryptedFileStreamType
|
||||||
from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager
|
from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager
|
||||||
from lbrynet.lbrynet_daemon.Settings import Settings
|
|
||||||
from lbrynet.lbrynet_daemon.UIManager import UIManager
|
from lbrynet.lbrynet_daemon.UIManager import UIManager
|
||||||
from lbrynet.lbrynet_daemon.Downloader import GetStream
|
from lbrynet.lbrynet_daemon.Downloader import GetStream
|
||||||
from lbrynet.lbrynet_daemon.Publisher import Publisher
|
from lbrynet.lbrynet_daemon.Publisher import Publisher
|
||||||
|
@ -266,7 +265,7 @@ class Daemon(AuthJSONRPCServer):
|
||||||
self.log_uploader = log_support.LogUploader.load('lbrynet', self.log_file)
|
self.log_uploader = log_support.LogUploader.load('lbrynet', self.log_file)
|
||||||
|
|
||||||
self.analytics_manager = analytics_manager
|
self.analytics_manager = analytics_manager
|
||||||
self.lbryid = PENDING_LBRY_ID
|
self.lbryid = utils.generate_id()
|
||||||
self.daemon_conf = conf.settings.get_conf_filename()
|
self.daemon_conf = conf.settings.get_conf_filename()
|
||||||
|
|
||||||
self.wallet_user = None
|
self.wallet_user = None
|
||||||
|
@ -286,14 +285,13 @@ class Daemon(AuthJSONRPCServer):
|
||||||
self.looping_call_manager = LoopingCallManager(calls)
|
self.looping_call_manager = LoopingCallManager(calls)
|
||||||
self.sd_identifier = StreamDescriptorIdentifier()
|
self.sd_identifier = StreamDescriptorIdentifier()
|
||||||
self.stream_info_manager = TempEncryptedFileMetadataManager()
|
self.stream_info_manager = TempEncryptedFileMetadataManager()
|
||||||
self.settings = Settings(self.db_dir)
|
|
||||||
self.lbry_ui_manager = UIManager(root)
|
self.lbry_ui_manager = UIManager(root)
|
||||||
self.blob_request_payment_rate_manager = None
|
|
||||||
self.lbry_file_metadata_manager = None
|
self.lbry_file_metadata_manager = None
|
||||||
self.lbry_file_manager = None
|
self.lbry_file_manager = None
|
||||||
|
|
||||||
|
|
||||||
def setup(self):
|
def setup(self):
|
||||||
|
self._modify_loggly_formatter()
|
||||||
|
|
||||||
def _log_starting_vals():
|
def _log_starting_vals():
|
||||||
log.info("Starting balance: " + str(self.session.wallet.wallet_balance))
|
log.info("Starting balance: " + str(self.session.wallet.wallet_balance))
|
||||||
return defer.succeed(None)
|
return defer.succeed(None)
|
||||||
|
@ -338,7 +336,6 @@ class Daemon(AuthJSONRPCServer):
|
||||||
d.addCallback(lambda _: self._initial_setup())
|
d.addCallback(lambda _: self._initial_setup())
|
||||||
d.addCallback(lambda _: threads.deferToThread(self._setup_data_directory))
|
d.addCallback(lambda _: threads.deferToThread(self._setup_data_directory))
|
||||||
d.addCallback(lambda _: self._check_db_migration())
|
d.addCallback(lambda _: self._check_db_migration())
|
||||||
d.addCallback(lambda _: self._get_settings())
|
|
||||||
d.addCallback(lambda _: self._load_caches())
|
d.addCallback(lambda _: self._load_caches())
|
||||||
d.addCallback(lambda _: self._set_events())
|
d.addCallback(lambda _: self._set_events())
|
||||||
d.addCallback(lambda _: self._get_session())
|
d.addCallback(lambda _: self._get_session())
|
||||||
|
@ -516,17 +513,10 @@ class Daemon(AuthJSONRPCServer):
|
||||||
return defer.succeed(True)
|
return defer.succeed(True)
|
||||||
|
|
||||||
def _setup_server(self):
|
def _setup_server(self):
|
||||||
def restore_running_status(running):
|
self.startup_status = STARTUP_STAGES[4]
|
||||||
if running is True:
|
|
||||||
d = self._start_server()
|
d = self._start_server()
|
||||||
d.addCallback(lambda _: self._start_reflector())
|
d.addCallback(lambda _: self._start_reflector())
|
||||||
return defer.succeed(True)
|
return d
|
||||||
|
|
||||||
self.startup_status = STARTUP_STAGES[4]
|
|
||||||
|
|
||||||
dl = self.settings.get_server_running_status()
|
|
||||||
dl.addCallback(restore_running_status)
|
|
||||||
return dl
|
|
||||||
|
|
||||||
def _setup_query_handlers(self):
|
def _setup_query_handlers(self):
|
||||||
handlers = [
|
handlers = [
|
||||||
|
@ -538,42 +528,21 @@ class Daemon(AuthJSONRPCServer):
|
||||||
),
|
),
|
||||||
self.session.wallet.get_wallet_info_query_handler_factory(),
|
self.session.wallet.get_wallet_info_query_handler_factory(),
|
||||||
]
|
]
|
||||||
|
return self._add_query_handlers(handlers)
|
||||||
def get_blob_request_handler_factory(rate):
|
|
||||||
self.blob_request_payment_rate_manager = self.session.payment_rate_manager
|
|
||||||
|
|
||||||
d1 = self.settings.get_server_data_payment_rate()
|
|
||||||
d1.addCallback(get_blob_request_handler_factory)
|
|
||||||
|
|
||||||
dl = defer.DeferredList([d1])
|
|
||||||
dl.addCallback(lambda _: self._add_query_handlers(handlers))
|
|
||||||
return dl
|
|
||||||
|
|
||||||
def _add_query_handlers(self, query_handlers):
|
def _add_query_handlers(self, query_handlers):
|
||||||
def _set_query_handlers(statuses):
|
|
||||||
from future_builtins import zip
|
|
||||||
for handler, (success, status) in zip(query_handlers, statuses):
|
|
||||||
if success is True:
|
|
||||||
self.query_handlers[handler] = status
|
|
||||||
|
|
||||||
ds = []
|
|
||||||
for handler in query_handlers:
|
for handler in query_handlers:
|
||||||
query_id = handler.get_primary_query_identifier()
|
query_id = handler.get_primary_query_identifier()
|
||||||
ds.append(self.settings.get_query_handler_status(query_id))
|
self.query_handlers[query_id] = handler
|
||||||
dl = defer.DeferredList(ds)
|
return defer.succeed(None)
|
||||||
dl.addCallback(_set_query_handlers)
|
|
||||||
return dl
|
|
||||||
|
|
||||||
def _upload_log(self, log_type=None, exclude_previous=False, force=False):
|
def _upload_log(self, log_type=None, exclude_previous=False, force=False):
|
||||||
if self.upload_log or force:
|
if self.upload_log or force:
|
||||||
if self.lbryid is not PENDING_LBRY_ID:
|
lbry_id = base58.b58encode(self.lbryid)[:SHORT_LBRY_ID_LEN]
|
||||||
id_hash = base58.b58encode(self.lbryid)[:SHORT_LBRY_ID_LEN]
|
|
||||||
else:
|
|
||||||
id_hash = self.lbryid
|
|
||||||
try:
|
try:
|
||||||
self.log_uploader.upload(exclude_previous, self.lbryid, log_type)
|
self.log_uploader.upload(exclude_previous, lbry_id, log_type)
|
||||||
except requests.RequestException:
|
except requests.RequestException:
|
||||||
log.exception('Failed to upload log file')
|
log.warning('Failed to upload log file')
|
||||||
return defer.succeed(None)
|
return defer.succeed(None)
|
||||||
|
|
||||||
def _clean_up_temp_files(self):
|
def _clean_up_temp_files(self):
|
||||||
|
@ -700,26 +669,6 @@ class Daemon(AuthJSONRPCServer):
|
||||||
return d
|
return d
|
||||||
return defer.succeed(True)
|
return defer.succeed(True)
|
||||||
|
|
||||||
def _get_settings(self):
|
|
||||||
d = self.settings.start()
|
|
||||||
d.addCallback(lambda _: self.settings.get_lbryid())
|
|
||||||
d.addCallback(self._set_lbryid)
|
|
||||||
d.addCallback(lambda _: self._modify_loggly_formatter())
|
|
||||||
return d
|
|
||||||
|
|
||||||
def _set_lbryid(self, lbryid):
|
|
||||||
if lbryid is PENDING_LBRY_ID or lbryid is None:
|
|
||||||
return self._make_set_and_save_lbryid()
|
|
||||||
else:
|
|
||||||
log.info("LBRY ID: " + base58.b58encode(lbryid))
|
|
||||||
self.lbryid = lbryid
|
|
||||||
|
|
||||||
def _make_set_and_save_lbryid(self):
|
|
||||||
self.lbryid = utils.generate_id()
|
|
||||||
log.info("Generated new LBRY ID: " + base58.b58encode(self.lbryid))
|
|
||||||
d = self.settings.save_lbryid(self.lbryid)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def _modify_loggly_formatter(self):
|
def _modify_loggly_formatter(self):
|
||||||
log_support.configure_loggly_handler(
|
log_support.configure_loggly_handler(
|
||||||
lbry_id=base58.b58encode(self.lbryid),
|
lbry_id=base58.b58encode(self.lbryid),
|
||||||
|
@ -759,12 +708,6 @@ class Daemon(AuthJSONRPCServer):
|
||||||
)
|
)
|
||||||
|
|
||||||
def _get_session(self):
|
def _get_session(self):
|
||||||
def get_default_data_rate():
|
|
||||||
d = self.settings.get_default_data_payment_rate()
|
|
||||||
d.addCallback(lambda rate: {"default_data_payment_rate": rate if rate is not None else
|
|
||||||
conf.settings.data_rate})
|
|
||||||
return d
|
|
||||||
|
|
||||||
def get_wallet():
|
def get_wallet():
|
||||||
if self.wallet_type == LBRYCRD_WALLET:
|
if self.wallet_type == LBRYCRD_WALLET:
|
||||||
raise ValueError('LBRYcrd Wallet is no longer supported')
|
raise ValueError('LBRYcrd Wallet is no longer supported')
|
||||||
|
@ -773,28 +716,18 @@ class Daemon(AuthJSONRPCServer):
|
||||||
config = {'auto_connect': True}
|
config = {'auto_connect': True}
|
||||||
if conf.settings.lbryum_wallet_dir:
|
if conf.settings.lbryum_wallet_dir:
|
||||||
config['lbryum_path'] = conf.settings.lbryum_wallet_dir
|
config['lbryum_path'] = conf.settings.lbryum_wallet_dir
|
||||||
d = defer.succeed(LBRYumWallet(self.db_dir, config))
|
return defer.succeed(LBRYumWallet(self.db_dir, config))
|
||||||
elif self.wallet_type == PTC_WALLET:
|
elif self.wallet_type == PTC_WALLET:
|
||||||
log.info("Using PTC wallet")
|
log.info("Using PTC wallet")
|
||||||
d = defer.succeed(PTCWallet(self.db_dir))
|
return defer.succeed(PTCWallet(self.db_dir))
|
||||||
else:
|
else:
|
||||||
raise ValueError('Wallet Type {} is not valid'.format(self.wallet_type))
|
raise ValueError('Wallet Type {} is not valid'.format(self.wallet_type))
|
||||||
d.addCallback(lambda w: {"wallet": w})
|
|
||||||
return d
|
|
||||||
|
|
||||||
d1 = get_default_data_rate()
|
d = get_wallet()
|
||||||
d2 = get_wallet()
|
|
||||||
|
|
||||||
def combine_results(results):
|
def create_session(wallet):
|
||||||
r = {}
|
|
||||||
for success, result in results:
|
|
||||||
if success is True:
|
|
||||||
r.update(result)
|
|
||||||
return r
|
|
||||||
|
|
||||||
def create_session(results):
|
|
||||||
self.session = Session(
|
self.session = Session(
|
||||||
results['default_data_payment_rate'],
|
conf.settings.data_rate,
|
||||||
db_dir=self.db_dir,
|
db_dir=self.db_dir,
|
||||||
lbryid=self.lbryid,
|
lbryid=self.lbryid,
|
||||||
blob_dir=self.blobfile_dir,
|
blob_dir=self.blobfile_dir,
|
||||||
|
@ -802,17 +735,15 @@ class Daemon(AuthJSONRPCServer):
|
||||||
known_dht_nodes=conf.settings.known_dht_nodes,
|
known_dht_nodes=conf.settings.known_dht_nodes,
|
||||||
peer_port=self.peer_port,
|
peer_port=self.peer_port,
|
||||||
use_upnp=self.use_upnp,
|
use_upnp=self.use_upnp,
|
||||||
wallet=results['wallet'],
|
wallet=wallet,
|
||||||
is_generous=conf.settings.is_generous_host
|
is_generous=conf.settings.is_generous_host
|
||||||
)
|
)
|
||||||
self.startup_status = STARTUP_STAGES[2]
|
self.startup_status = STARTUP_STAGES[2]
|
||||||
|
|
||||||
dl = defer.DeferredList([d1, d2], fireOnOneErrback=True)
|
d.addCallback(create_session)
|
||||||
dl.addCallback(combine_results)
|
d.addCallback(lambda _: self.session.setup())
|
||||||
dl.addCallback(create_session)
|
|
||||||
dl.addCallback(lambda _: self.session.setup())
|
|
||||||
|
|
||||||
return dl
|
return d
|
||||||
|
|
||||||
def _setup_stream_identifier(self):
|
def _setup_stream_identifier(self):
|
||||||
file_saver_factory = EncryptedFileSaverFactory(
|
file_saver_factory = EncryptedFileSaverFactory(
|
||||||
|
@ -953,7 +884,7 @@ class Daemon(AuthJSONRPCServer):
|
||||||
reactor.callLater(self.search_timeout, _check_est, d)
|
reactor.callLater(self.search_timeout, _check_est, d)
|
||||||
d.addCallback(
|
d.addCallback(
|
||||||
lambda _: download_sd_blob(
|
lambda _: download_sd_blob(
|
||||||
self.session, sd_hash, self.blob_request_payment_rate_manager))
|
self.session, sd_hash, self.session.payment_rate_manager))
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def get_or_download_sd_blob(self, sd_hash):
|
def get_or_download_sd_blob(self, sd_hash):
|
||||||
|
|
|
@ -1,114 +0,0 @@
|
||||||
import binascii
|
|
||||||
import functools
|
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
|
|
||||||
from twisted.internet import threads, defer
|
|
||||||
import unqlite
|
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
def run_in_thread(fn):
|
|
||||||
@functools.wraps(fn)
|
|
||||||
def wrapped(*args, **kwargs):
|
|
||||||
return threads.deferToThread(fn, *args, **kwargs)
|
|
||||||
return wrapped
|
|
||||||
|
|
||||||
|
|
||||||
class Settings(object):
|
|
||||||
NAME = "settings.db"
|
|
||||||
def __init__(self, db_dir):
|
|
||||||
self.db_dir = db_dir
|
|
||||||
self.db = None
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
return self._open_db()
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
self.db.close()
|
|
||||||
self.db = None
|
|
||||||
return defer.succeed(True)
|
|
||||||
|
|
||||||
def _open_db(self):
|
|
||||||
filename = os.path.join(self.db_dir, self.NAME)
|
|
||||||
log.debug("Opening %s as the settings database", filename)
|
|
||||||
self.db = unqlite.UnQLite(filename)
|
|
||||||
return defer.succeed(True)
|
|
||||||
|
|
||||||
@run_in_thread
|
|
||||||
def save_lbryid(self, lbryid):
|
|
||||||
self.db['lbryid'] = binascii.hexlify(lbryid)
|
|
||||||
self.db.commit()
|
|
||||||
|
|
||||||
@run_in_thread
|
|
||||||
def get_lbryid(self):
|
|
||||||
if 'lbryid' in self.db:
|
|
||||||
return binascii.unhexlify(self.db['lbryid'])
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
|
|
||||||
@run_in_thread
|
|
||||||
def get_server_running_status(self):
|
|
||||||
if 'server_running' in self.db:
|
|
||||||
return json.loads(self.db['server_running'])
|
|
||||||
else:
|
|
||||||
return True
|
|
||||||
|
|
||||||
@run_in_thread
|
|
||||||
def save_server_running_status(self, running):
|
|
||||||
self.db['server_running'] = json.dumps(running)
|
|
||||||
self.db.commit()
|
|
||||||
|
|
||||||
def get_default_data_payment_rate(self):
|
|
||||||
return self._get_payment_rate("default_data_payment_rate")
|
|
||||||
|
|
||||||
def save_default_data_payment_rate(self, rate):
|
|
||||||
return self._save_payment_rate("default_data_payment_rate", rate)
|
|
||||||
|
|
||||||
def get_server_data_payment_rate(self):
|
|
||||||
return self._get_payment_rate("server_data_payment_rate")
|
|
||||||
|
|
||||||
def save_server_data_payment_rate(self, rate):
|
|
||||||
return self._save_payment_rate("server_data_payment_rate", rate)
|
|
||||||
|
|
||||||
def get_server_crypt_info_payment_rate(self):
|
|
||||||
return self._get_payment_rate("server_crypt_info_payment_rate")
|
|
||||||
|
|
||||||
def save_server_crypt_info_payment_rate(self, rate):
|
|
||||||
return self._save_payment_rate("server_crypt_info_payment_rate", rate)
|
|
||||||
|
|
||||||
@run_in_thread
|
|
||||||
def _get_payment_rate(self, rate_type):
|
|
||||||
if rate_type in self.db:
|
|
||||||
return json.loads(self.db[rate_type])
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
|
|
||||||
@run_in_thread
|
|
||||||
def _save_payment_rate(self, rate_type, rate):
|
|
||||||
if rate is not None:
|
|
||||||
self.db[rate_type] = json.dumps(rate)
|
|
||||||
elif rate_type in self.db:
|
|
||||||
del self.db[rate_type]
|
|
||||||
self.db.commit()
|
|
||||||
|
|
||||||
@run_in_thread
|
|
||||||
def get_query_handler_status(self, query_identifier):
|
|
||||||
if json.dumps(('q_h', query_identifier)) in self.db:
|
|
||||||
return json.loads(self.db[(json.dumps(('q_h', query_identifier)))])
|
|
||||||
else:
|
|
||||||
return True
|
|
||||||
|
|
||||||
def enable_query_handler(self, query_identifier):
|
|
||||||
return self._set_query_handler_status(query_identifier, True)
|
|
||||||
|
|
||||||
def disable_query_handler(self, query_identifier):
|
|
||||||
return self._set_query_handler_status(query_identifier, False)
|
|
||||||
|
|
||||||
@run_in_thread
|
|
||||||
def _set_query_handler_status(self, query_identifier, status):
|
|
||||||
self.db[json.dumps(('q_h', query_identifier))] = json.dumps(status)
|
|
||||||
self.db.commit()
|
|
Loading…
Reference in a new issue