diff --git a/hub/db/db.py b/hub/db/db.py index e3e5e40..6ca8685 100644 --- a/hub/db/db.py +++ b/hub/db/db.py @@ -49,6 +49,7 @@ class SecondaryDB: self._reorg_limit = reorg_limit self._cache_all_tx_hashes = cache_all_tx_hashes self._secondary_name = secondary_name + self._need_restart_path = os.path.join(self._db_dir, 'NEED_SCRIBE_RESTART') if secondary_name: assert max_open_files == -1, 'max open files must be -1 for secondary readers' self._db_max_open_files = max_open_files @@ -1000,6 +1001,27 @@ class SecondaryDB: secondary_path = '' if not self._secondary_name else os.path.join( self._db_dir, self._secondary_name ) + open_db_canary = None + + if self._secondary_name: + open_db_canary = os.path.join(self._db_dir, f'{self._secondary_name}-db-canary') + if os.path.exists(open_db_canary): + with open(self._need_restart_path, 'w+') as f: + f.write(f"{time.strftime(f'%Y-%m-%d %H:%M:%S')} {self._secondary_name}\n") + raise RuntimeError('scribe restart is needed') + else: + with open(open_db_canary, 'w'): + pass + else: + herald_db_canary = os.path.join(self._db_dir, 'lbry-reader-db-canary') + es_sync_db_canary = os.path.join(self._db_dir, 'lbry-elastic-writer-db-canary') + if os.path.exists(herald_db_canary): + os.remove(herald_db_canary) + if os.path.exists(es_sync_db_canary): + os.remove(es_sync_db_canary) + if os.path.exists(self._need_restart_path): + os.remove(self._need_restart_path) + db_path = os.path.join(self._db_dir, 'lbry-rocksdb') self.prefix_db = PrefixDB( db_path, reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files, @@ -1008,6 +1030,7 @@ class SecondaryDB: ) if secondary_path != '': + os.remove(open_db_canary) self.logger.info(f'opened db for read only: lbry-rocksdb (%s)', db_path) else: self.logger.info(f'opened db for writing: lbry-rocksdb (%s)', db_path) diff --git a/hub/scribe/service.py b/hub/scribe/service.py index ac21752..b6579cc 100644 --- a/hub/scribe/service.py +++ b/hub/scribe/service.py @@ -1,3 +1,4 @@ +import os import time import asyncio import typing @@ -2069,6 +2070,8 @@ class BlockchainProcessorService(BlockchainService): """Loop forever processing blocks as they arrive.""" self._caught_up_event = caught_up_event try: + if os.path.exists(self.db._need_restart_path): + raise MemoryError() if self.height != self.daemon.cached_height() and not self.db.catching_up: await self._need_catch_up() # tell the readers that we're still catching up with lbrycrd/lbcd while not self._stopping: