forked from LBRYCommunity/lbry-sdk
convert directory and SQLiteStorage setup to be a Component
This commit is contained in:
parent
68b31a09b4
commit
eb11da9b19
3 changed files with 102 additions and 58 deletions
89
lbrynet/daemon/Components.py
Normal file
89
lbrynet/daemon/Components.py
Normal file
|
@ -0,0 +1,89 @@
|
||||||
|
import os
|
||||||
|
import logging
|
||||||
|
from twisted.internet import defer, threads
|
||||||
|
from lbrynet import conf
|
||||||
|
from lbrynet.database.storage import SQLiteStorage
|
||||||
|
from lbrynet.daemon.Component import Component
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# settings must be initialized before this file is imported
|
||||||
|
|
||||||
|
DATABASE_COMPONENT = "database"
|
||||||
|
|
||||||
|
|
||||||
|
class DatabaseComponent(Component):
|
||||||
|
component_name = DATABASE_COMPONENT
|
||||||
|
storage = None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_db_dir():
|
||||||
|
return conf.settings['data_dir']
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_download_directory():
|
||||||
|
return conf.settings['download_directory']
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_blobfile_dir():
|
||||||
|
return conf.settings['BLOBFILES_DIR']
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_current_db_revision():
|
||||||
|
return 7
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_revision_filename():
|
||||||
|
return conf.settings.get_db_revision_filename()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _write_db_revision_file(version_num):
|
||||||
|
with open(conf.settings.get_db_revision_filename(), mode='w') as db_revision:
|
||||||
|
db_revision.write(str(version_num))
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def setup(cls):
|
||||||
|
# check directories exist, create them if they don't
|
||||||
|
log.info("Loading databases")
|
||||||
|
if not os.path.exists(cls.get_download_directory()):
|
||||||
|
os.mkdir(cls.get_download_directory())
|
||||||
|
if not os.path.exists(cls.get_db_dir()):
|
||||||
|
os.mkdir(cls.get_db_dir())
|
||||||
|
cls._write_db_revision_file(cls.get_current_db_revision())
|
||||||
|
log.debug("Created the db revision file: %s", cls.get_revision_filename())
|
||||||
|
if not os.path.exists(cls.get_blobfile_dir()):
|
||||||
|
os.mkdir(cls.get_blobfile_dir())
|
||||||
|
log.debug("Created the blobfile directory: %s", str(cls.get_blobfile_dir()))
|
||||||
|
if not os.path.exists(cls.get_revision_filename()):
|
||||||
|
log.warning("db_revision file not found. Creating it")
|
||||||
|
cls._write_db_revision_file(cls.get_current_db_revision())
|
||||||
|
|
||||||
|
# check the db migration and run any needed migrations
|
||||||
|
migrated = False
|
||||||
|
with open(cls.get_revision_filename(), "r") as revision_read_handle:
|
||||||
|
old_revision = int(revision_read_handle.read().strip())
|
||||||
|
|
||||||
|
if old_revision > cls.get_current_db_revision():
|
||||||
|
raise Exception('This version of lbrynet is not compatible with the database\n'
|
||||||
|
'Your database is revision %i, expected %i' %
|
||||||
|
(old_revision, cls.get_current_db_revision()))
|
||||||
|
if old_revision < cls.get_current_db_revision():
|
||||||
|
from lbrynet.database.migrator import dbmigrator
|
||||||
|
log.info("Upgrading your databases (revision %i to %i)", old_revision, cls.get_current_db_revision())
|
||||||
|
yield threads.deferToThread(
|
||||||
|
dbmigrator.migrate_db, cls.get_db_dir(), old_revision, cls.get_current_db_revision()
|
||||||
|
)
|
||||||
|
cls._write_db_revision_file(cls.get_current_db_revision())
|
||||||
|
log.info("Finished upgrading the databases.")
|
||||||
|
migrated = True
|
||||||
|
|
||||||
|
# start SQLiteStorage
|
||||||
|
cls.storage = SQLiteStorage(cls.get_db_dir())
|
||||||
|
yield cls.storage.setup()
|
||||||
|
defer.returnValue(migrated)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def stop(cls):
|
||||||
|
yield cls.storage.stop()
|
|
@ -185,7 +185,6 @@ class Daemon(AuthJSONRPCServer):
|
||||||
def __init__(self, analytics_manager):
|
def __init__(self, analytics_manager):
|
||||||
AuthJSONRPCServer.__init__(self, conf.settings['use_auth_http'])
|
AuthJSONRPCServer.__init__(self, conf.settings['use_auth_http'])
|
||||||
self.db_dir = conf.settings['data_dir']
|
self.db_dir = conf.settings['data_dir']
|
||||||
self.storage = SQLiteStorage(self.db_dir)
|
|
||||||
self.download_directory = conf.settings['download_directory']
|
self.download_directory = conf.settings['download_directory']
|
||||||
if conf.settings['BLOBFILES_DIR'] == "blobfiles":
|
if conf.settings['BLOBFILES_DIR'] == "blobfiles":
|
||||||
self.blobfile_dir = os.path.join(self.db_dir, "blobfiles")
|
self.blobfile_dir = os.path.join(self.db_dir, "blobfiles")
|
||||||
|
@ -233,6 +232,7 @@ class Daemon(AuthJSONRPCServer):
|
||||||
self.looping_call_manager = LoopingCallManager(calls)
|
self.looping_call_manager = LoopingCallManager(calls)
|
||||||
self.sd_identifier = StreamDescriptorIdentifier()
|
self.sd_identifier = StreamDescriptorIdentifier()
|
||||||
self.lbry_file_manager = None
|
self.lbry_file_manager = None
|
||||||
|
self.storage = None
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def setup(self):
|
def setup(self):
|
||||||
|
@ -246,9 +246,8 @@ class Daemon(AuthJSONRPCServer):
|
||||||
self.exchange_rate_manager.start()
|
self.exchange_rate_manager.start()
|
||||||
|
|
||||||
yield self._initial_setup()
|
yield self._initial_setup()
|
||||||
yield threads.deferToThread(self._setup_data_directory)
|
yield self.component_manager.setup()
|
||||||
migrated = yield self._check_db_migration()
|
self.storage = self.component_manager.get_component("database").storage
|
||||||
yield self.storage.setup()
|
|
||||||
yield self._get_session()
|
yield self._get_session()
|
||||||
yield self._check_wallet_locked()
|
yield self._check_wallet_locked()
|
||||||
yield self._start_analytics()
|
yield self._start_analytics()
|
||||||
|
@ -262,15 +261,15 @@ class Daemon(AuthJSONRPCServer):
|
||||||
self.startup_status = STARTUP_STAGES[5]
|
self.startup_status = STARTUP_STAGES[5]
|
||||||
log.info("Started lbrynet-daemon")
|
log.info("Started lbrynet-daemon")
|
||||||
|
|
||||||
###
|
# ###
|
||||||
# this should be removed with the next db revision
|
# # this should be removed with the next db revision
|
||||||
if migrated:
|
# if migrated:
|
||||||
missing_channel_claim_ids = yield self.storage.get_unknown_certificate_ids()
|
# missing_channel_claim_ids = yield self.storage.get_unknown_certificate_ids()
|
||||||
while missing_channel_claim_ids: # in case there are a crazy amount lets batch to be safe
|
# while missing_channel_claim_ids: # in case there are a crazy amount lets batch to be safe
|
||||||
batch = missing_channel_claim_ids[:100]
|
# batch = missing_channel_claim_ids[:100]
|
||||||
_ = yield self.session.wallet.get_claims_by_ids(*batch)
|
# _ = yield self.session.wallet.get_claims_by_ids(*batch)
|
||||||
missing_channel_claim_ids = missing_channel_claim_ids[100:]
|
# missing_channel_claim_ids = missing_channel_claim_ids[100:]
|
||||||
###
|
# ###
|
||||||
|
|
||||||
self._auto_renew()
|
self._auto_renew()
|
||||||
|
|
||||||
|
@ -477,50 +476,6 @@ class Daemon(AuthJSONRPCServer):
|
||||||
|
|
||||||
return defer.succeed(True)
|
return defer.succeed(True)
|
||||||
|
|
||||||
def _write_db_revision_file(self, version_num):
|
|
||||||
with open(self.db_revision_file, mode='w') as db_revision:
|
|
||||||
db_revision.write(str(version_num))
|
|
||||||
|
|
||||||
def _setup_data_directory(self):
|
|
||||||
old_revision = 1
|
|
||||||
self.startup_status = STARTUP_STAGES[1]
|
|
||||||
log.info("Loading databases")
|
|
||||||
if not os.path.exists(self.download_directory):
|
|
||||||
os.mkdir(self.download_directory)
|
|
||||||
if not os.path.exists(self.db_dir):
|
|
||||||
os.mkdir(self.db_dir)
|
|
||||||
self._write_db_revision_file(self.current_db_revision)
|
|
||||||
log.debug("Created the db revision file: %s", self.db_revision_file)
|
|
||||||
if not os.path.exists(self.blobfile_dir):
|
|
||||||
os.mkdir(self.blobfile_dir)
|
|
||||||
log.debug("Created the blobfile directory: %s", str(self.blobfile_dir))
|
|
||||||
if not os.path.exists(self.db_revision_file):
|
|
||||||
log.warning("db_revision file not found. Creating it")
|
|
||||||
self._write_db_revision_file(self.current_db_revision)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def _check_db_migration(self):
|
|
||||||
old_revision = 1
|
|
||||||
migrated = False
|
|
||||||
if os.path.exists(self.db_revision_file):
|
|
||||||
with open(self.db_revision_file, "r") as revision_read_handle:
|
|
||||||
old_revision = int(revision_read_handle.read().strip())
|
|
||||||
|
|
||||||
if old_revision > self.current_db_revision:
|
|
||||||
raise Exception('This version of lbrynet is not compatible with the database\n'
|
|
||||||
'Your database is revision %i, expected %i' %
|
|
||||||
(old_revision, self.current_db_revision))
|
|
||||||
if old_revision < self.current_db_revision:
|
|
||||||
from lbrynet.database.migrator import dbmigrator
|
|
||||||
log.info("Upgrading your databases (revision %i to %i)", old_revision, self.current_db_revision)
|
|
||||||
yield threads.deferToThread(
|
|
||||||
dbmigrator.migrate_db, self.db_dir, old_revision, self.current_db_revision
|
|
||||||
)
|
|
||||||
self._write_db_revision_file(self.current_db_revision)
|
|
||||||
log.info("Finished upgrading the databases.")
|
|
||||||
migrated = True
|
|
||||||
defer.returnValue(migrated)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _setup_lbry_file_manager(self):
|
def _setup_lbry_file_manager(self):
|
||||||
log.info('Starting the file manager')
|
log.info('Starting the file manager')
|
||||||
|
|
|
@ -1,3 +1,3 @@
|
||||||
|
import Components # register Component classes
|
||||||
from lbrynet.daemon.auth.client import LBRYAPIClient
|
from lbrynet.daemon.auth.client import LBRYAPIClient
|
||||||
|
|
||||||
get_client = LBRYAPIClient.get_client
|
get_client = LBRYAPIClient.get_client
|
||||||
|
|
Loading…
Reference in a new issue