lbry-sdk/lbrynet/extras/daemon/ComponentManager.py

181 lines
6.4 KiB
Python
Raw Normal View History

import logging
2019-01-22 23:44:17 +01:00
import asyncio
2019-01-21 21:55:50 +01:00
from lbrynet.conf import Config
2019-01-22 23:44:17 +01:00
from lbrynet.error import ComponentStartConditionNotMet
from lbrynet.dht.peer import PeerManager
log = logging.getLogger(__name__)
2018-07-25 07:23:02 +02:00
class RegisteredConditions:
conditions = {}
class RequiredConditionType(type):
def __new__(mcs, name, bases, newattrs):
klass = type.__new__(mcs, name, bases, newattrs)
if name != "RequiredCondition":
if klass.name in RegisteredConditions.conditions:
raise SyntaxError("already have a component registered for \"%s\"" % klass.name)
RegisteredConditions.conditions[klass.name] = klass
return klass
class RequiredCondition(metaclass=RequiredConditionType):
name = ""
component = ""
message = ""
@staticmethod
def evaluate(component):
raise NotImplementedError()
2018-07-25 07:23:02 +02:00
class ComponentManager:
default_component_classes = {}
2019-01-22 23:44:17 +01:00
def __init__(self, conf: Config, analytics_manager=None, skip_components=None,
peer_manager=None, **override_components):
2019-01-21 21:55:50 +01:00
self.conf = conf
self.skip_components = skip_components or []
2019-01-22 23:44:17 +01:00
self.loop = asyncio.get_event_loop()
self.analytics_manager = analytics_manager
self.component_classes = {}
self.components = set()
2019-01-22 23:44:17 +01:00
self.started = asyncio.Event(loop=self.loop)
self.peer_manager = peer_manager or PeerManager(asyncio.get_event_loop_policy().get_event_loop())
2018-07-06 07:17:20 +02:00
for component_name, component_class in self.default_component_classes.items():
if component_name in override_components:
component_class = override_components.pop(component_name)
if component_name not in self.skip_components:
self.component_classes[component_name] = component_class
if override_components:
raise SyntaxError("unexpected components: %s" % override_components)
2018-07-06 07:17:20 +02:00
for component_class in self.component_classes.values():
self.components.add(component_class(self))
def evaluate_condition(self, condition_name):
if condition_name not in RegisteredConditions.conditions:
raise NameError(condition_name)
condition = RegisteredConditions.conditions[condition_name]
try:
component = self.get_component(condition.component)
2018-10-16 21:04:20 +02:00
result = condition.evaluate(component)
2018-11-04 07:24:41 +01:00
except Exception:
log.exception('failed to evaluate condition:')
result = False
2018-10-16 21:04:20 +02:00
return result, "" if result else condition.message
def sort_components(self, reverse=False):
"""
Sort components by requirements
"""
steps = []
staged = set()
components = set(self.components)
# components with no requirements
step = []
for component in set(components):
if not component.depends_on:
step.append(component)
staged.add(component.component_name)
components.remove(component)
if step:
step.sort()
steps.append(step)
while components:
step = []
to_stage = set()
for component in set(components):
reqs_met = 0
for needed in component.depends_on:
if needed in staged:
reqs_met += 1
if reqs_met == len(component.depends_on):
step.append(component)
to_stage.add(component.component_name)
components.remove(component)
if step:
step.sort()
staged.update(to_stage)
steps.append(step)
elif components:
raise ComponentStartConditionNotMet("Unresolved dependencies for: %s" % components)
if reverse:
steps.reverse()
return steps
2018-12-15 21:31:02 +01:00
async def setup(self, **callbacks):
""" Start Components in sequence sorted by requirements """
2018-07-06 07:17:20 +02:00
for component_name, cb in callbacks.items():
if component_name not in self.component_classes:
if component_name not in self.skip_components:
raise NameError("unknown component: %s" % component_name)
2018-04-02 22:49:48 +02:00
if not callable(cb):
raise ValueError("%s is not callable" % cb)
2018-12-15 21:31:02 +01:00
async def _setup(component):
await component._setup()
2018-04-02 22:49:48 +02:00
if component.component_name in callbacks:
2018-12-15 21:31:02 +01:00
maybe_coro = callbacks[component.component_name](component)
if asyncio.iscoroutine(maybe_coro):
2019-01-22 23:44:17 +01:00
await asyncio.create_task(maybe_coro)
2018-04-02 22:49:48 +02:00
stages = self.sort_components()
for stage in stages:
2018-12-15 21:31:02 +01:00
needing_start = [
_setup(component) for component in stage if not component.running
]
if needing_start:
await asyncio.wait(needing_start)
2019-01-22 23:44:17 +01:00
self.started.set()
2018-12-15 21:31:02 +01:00
async def stop(self):
"""
Stop Components in reversed startup order
"""
stages = self.sort_components(reverse=True)
for stage in stages:
2018-12-15 21:31:02 +01:00
needing_stop = [
component._stop() for component in stage if component.running
]
if needing_stop:
2019-01-22 23:44:17 +01:00
await asyncio.wait(needing_stop, loop=self.loop)
def all_components_running(self, *component_names):
"""
2018-04-02 22:49:48 +02:00
Check if components are running
:return: (bool) True if all specified components are running
"""
components = {component.component_name: component for component in self.components}
for component in component_names:
2018-04-02 22:49:48 +02:00
if component not in components:
raise NameError("%s is not a known Component" % component)
2018-04-02 22:49:48 +02:00
if not components[component].running:
return False
return True
def get_components_status(self):
"""
List status of all the components, whether they are running or not
:return: (dict) {(str) component_name: (bool) True is running else False}
"""
return {
component.component_name: component.running
for component in self.components
}
def get_component(self, component_name):
for component in self.components:
if component.component_name == component_name:
return component.component
raise NameError(component_name)