diff --git a/lbrynet/daemon/Components.py b/lbrynet/daemon/Components.py new file mode 100644 index 000000000..46f39212b --- /dev/null +++ b/lbrynet/daemon/Components.py @@ -0,0 +1,89 @@ +import os +import logging +from twisted.internet import defer, threads +from lbrynet import conf +from lbrynet.database.storage import SQLiteStorage +from lbrynet.daemon.Component import Component + +log = logging.getLogger(__name__) + +# settings must be initialized before this file is imported + +DATABASE_COMPONENT = "database" + + +class DatabaseComponent(Component): + component_name = DATABASE_COMPONENT + storage = None + + @staticmethod + def get_db_dir(): + return conf.settings['data_dir'] + + @staticmethod + def get_download_directory(): + return conf.settings['download_directory'] + + @staticmethod + def get_blobfile_dir(): + return conf.settings['BLOBFILES_DIR'] + + @staticmethod + def get_current_db_revision(): + return 7 + + @staticmethod + def get_revision_filename(): + return conf.settings.get_db_revision_filename() + + @staticmethod + def _write_db_revision_file(version_num): + with open(conf.settings.get_db_revision_filename(), mode='w') as db_revision: + db_revision.write(str(version_num)) + + @classmethod + @defer.inlineCallbacks + def setup(cls): + # check directories exist, create them if they don't + log.info("Loading databases") + if not os.path.exists(cls.get_download_directory()): + os.mkdir(cls.get_download_directory()) + if not os.path.exists(cls.get_db_dir()): + os.mkdir(cls.get_db_dir()) + cls._write_db_revision_file(cls.get_current_db_revision()) + log.debug("Created the db revision file: %s", cls.get_revision_filename()) + if not os.path.exists(cls.get_blobfile_dir()): + os.mkdir(cls.get_blobfile_dir()) + log.debug("Created the blobfile directory: %s", str(cls.get_blobfile_dir())) + if not os.path.exists(cls.get_revision_filename()): + log.warning("db_revision file not found. Creating it") + cls._write_db_revision_file(cls.get_current_db_revision()) + + # check the db migration and run any needed migrations + migrated = False + with open(cls.get_revision_filename(), "r") as revision_read_handle: + old_revision = int(revision_read_handle.read().strip()) + + if old_revision > cls.get_current_db_revision(): + raise Exception('This version of lbrynet is not compatible with the database\n' + 'Your database is revision %i, expected %i' % + (old_revision, cls.get_current_db_revision())) + if old_revision < cls.get_current_db_revision(): + from lbrynet.database.migrator import dbmigrator + log.info("Upgrading your databases (revision %i to %i)", old_revision, cls.get_current_db_revision()) + yield threads.deferToThread( + dbmigrator.migrate_db, cls.get_db_dir(), old_revision, cls.get_current_db_revision() + ) + cls._write_db_revision_file(cls.get_current_db_revision()) + log.info("Finished upgrading the databases.") + migrated = True + + # start SQLiteStorage + cls.storage = SQLiteStorage(cls.get_db_dir()) + yield cls.storage.setup() + defer.returnValue(migrated) + + @classmethod + @defer.inlineCallbacks + def stop(cls): + yield cls.storage.stop() diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 85969e07c..b75037819 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -185,7 +185,6 @@ class Daemon(AuthJSONRPCServer): def __init__(self, analytics_manager): AuthJSONRPCServer.__init__(self, conf.settings['use_auth_http']) self.db_dir = conf.settings['data_dir'] - self.storage = SQLiteStorage(self.db_dir) self.download_directory = conf.settings['download_directory'] if conf.settings['BLOBFILES_DIR'] == "blobfiles": self.blobfile_dir = os.path.join(self.db_dir, "blobfiles") @@ -233,6 +232,7 @@ class Daemon(AuthJSONRPCServer): self.looping_call_manager = LoopingCallManager(calls) self.sd_identifier = StreamDescriptorIdentifier() self.lbry_file_manager = None + self.storage = None @defer.inlineCallbacks def setup(self): @@ -246,9 +246,8 @@ class Daemon(AuthJSONRPCServer): self.exchange_rate_manager.start() yield self._initial_setup() - yield threads.deferToThread(self._setup_data_directory) - migrated = yield self._check_db_migration() - yield self.storage.setup() + yield self.component_manager.setup() + self.storage = self.component_manager.get_component("database").storage yield self._get_session() yield self._check_wallet_locked() yield self._start_analytics() @@ -262,15 +261,15 @@ class Daemon(AuthJSONRPCServer): self.startup_status = STARTUP_STAGES[5] log.info("Started lbrynet-daemon") - ### - # this should be removed with the next db revision - if migrated: - missing_channel_claim_ids = yield self.storage.get_unknown_certificate_ids() - while missing_channel_claim_ids: # in case there are a crazy amount lets batch to be safe - batch = missing_channel_claim_ids[:100] - _ = yield self.session.wallet.get_claims_by_ids(*batch) - missing_channel_claim_ids = missing_channel_claim_ids[100:] - ### + # ### + # # this should be removed with the next db revision + # if migrated: + # missing_channel_claim_ids = yield self.storage.get_unknown_certificate_ids() + # while missing_channel_claim_ids: # in case there are a crazy amount lets batch to be safe + # batch = missing_channel_claim_ids[:100] + # _ = yield self.session.wallet.get_claims_by_ids(*batch) + # missing_channel_claim_ids = missing_channel_claim_ids[100:] + # ### self._auto_renew() @@ -477,50 +476,6 @@ class Daemon(AuthJSONRPCServer): return defer.succeed(True) - def _write_db_revision_file(self, version_num): - with open(self.db_revision_file, mode='w') as db_revision: - db_revision.write(str(version_num)) - - def _setup_data_directory(self): - old_revision = 1 - self.startup_status = STARTUP_STAGES[1] - log.info("Loading databases") - if not os.path.exists(self.download_directory): - os.mkdir(self.download_directory) - if not os.path.exists(self.db_dir): - os.mkdir(self.db_dir) - self._write_db_revision_file(self.current_db_revision) - log.debug("Created the db revision file: %s", self.db_revision_file) - if not os.path.exists(self.blobfile_dir): - os.mkdir(self.blobfile_dir) - log.debug("Created the blobfile directory: %s", str(self.blobfile_dir)) - if not os.path.exists(self.db_revision_file): - log.warning("db_revision file not found. Creating it") - self._write_db_revision_file(self.current_db_revision) - - @defer.inlineCallbacks - def _check_db_migration(self): - old_revision = 1 - migrated = False - if os.path.exists(self.db_revision_file): - with open(self.db_revision_file, "r") as revision_read_handle: - old_revision = int(revision_read_handle.read().strip()) - - if old_revision > self.current_db_revision: - raise Exception('This version of lbrynet is not compatible with the database\n' - 'Your database is revision %i, expected %i' % - (old_revision, self.current_db_revision)) - if old_revision < self.current_db_revision: - from lbrynet.database.migrator import dbmigrator - log.info("Upgrading your databases (revision %i to %i)", old_revision, self.current_db_revision) - yield threads.deferToThread( - dbmigrator.migrate_db, self.db_dir, old_revision, self.current_db_revision - ) - self._write_db_revision_file(self.current_db_revision) - log.info("Finished upgrading the databases.") - migrated = True - defer.returnValue(migrated) - @defer.inlineCallbacks def _setup_lbry_file_manager(self): log.info('Starting the file manager') diff --git a/lbrynet/daemon/__init__.py b/lbrynet/daemon/__init__.py index 7461e1c00..8e0f5feca 100644 --- a/lbrynet/daemon/__init__.py +++ b/lbrynet/daemon/__init__.py @@ -1,3 +1,3 @@ +import Components # register Component classes from lbrynet.daemon.auth.client import LBRYAPIClient - get_client = LBRYAPIClient.get_client