diff --git a/hub/scribe/service.py b/hub/scribe/service.py index 5602e0f..206e83c 100644 --- a/hub/scribe/service.py +++ b/hub/scribe/service.py @@ -1729,6 +1729,7 @@ class BlockchainProcessorService(BlockchainService): await self.run_in_thread_with_lock(flush) def _iter_start_tasks(self): + yield self.start_prometheus() while self.db.db_version < max(self.db.DB_VERSIONS): if self.db.db_version == 7: from hub.db.migrators.migrate7to8 import migrate, FROM_VERSION, TO_VERSION @@ -1765,3 +1766,4 @@ class BlockchainProcessorService(BlockchainService): yield self._ready_to_stop.wait() yield self._stop_cancellable_tasks() yield self.daemon.close() + yield self.stop_prometheus() diff --git a/hub/service.py b/hub/service.py index d1d4f0e..83e6602 100644 --- a/hub/service.py +++ b/hub/service.py @@ -30,6 +30,7 @@ class BlockchainService: self.last_state: typing.Optional[DBState] = None self.secondary_name = secondary_name self._stopping = False + self.prometheus_server: typing.Optional[PrometheusServer] = None self.db = None self.open_db() @@ -42,6 +43,16 @@ class BlockchainService: index_address_status=env.index_address_status ) + async def start_prometheus(self): + if not self.prometheus_server and self.env.prometheus_port: + self.prometheus_server = PrometheusServer() + await self.prometheus_server.start("0.0.0.0", self.env.prometheus_port) + + async def stop_prometheus(self): + if self.prometheus_server: + await self.prometheus_server.stop() + self.prometheus_server = None + def start_cancellable(self, run, *args): _flag = asyncio.Event() self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag))) @@ -131,7 +142,6 @@ class BlockchainReaderService(BlockchainService): def __init__(self, env, secondary_name: str, thread_workers: int = 1, thread_prefix: str = 'blockchain-reader'): super().__init__(env, secondary_name, thread_workers, thread_prefix) self._refresh_interval = 0.1 - self.prometheus_server: typing.Optional[PrometheusServer] = None self.finished_initial_catch_up = asyncio.Event() async def poll_for_changes(self): @@ -259,13 +269,3 @@ class BlockchainReaderService(BlockchainService): def _iter_stop_tasks(self): yield self.stop_prometheus() yield self._stop_cancellable_tasks() - - async def start_prometheus(self): - if not self.prometheus_server and self.env.prometheus_port: - self.prometheus_server = PrometheusServer() - await self.prometheus_server.start("0.0.0.0", self.env.prometheus_port) - - async def stop_prometheus(self): - if self.prometheus_server: - await self.prometheus_server.stop() - self.prometheus_server = None