import signal
import logging
import asyncio
from concurrent.futures.thread import ThreadPoolExecutor
import typing

import lbry
from lbry.wallet.server.mempool import MemPool
from lbry.wallet.server.block_processor import BlockProcessor
from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.session import LBRYSessionManager
from lbry.prometheus import PrometheusServer


class Server:

    def __init__(self, env):
        self.env = env
        self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
        self.shutdown_event = asyncio.Event()
        self.cancellable_tasks = []

        self.daemon = daemon = env.coin.DAEMON(env.coin, env.daemon_url)
        self.db = db = LevelDB(env)
        self.bp = bp = BlockProcessor(env, db, daemon, self.shutdown_event)
        self.prometheus_server: typing.Optional[PrometheusServer] = None

        self.session_mgr = LBRYSessionManager(
            env, db, bp, daemon, self.shutdown_event
        )
        self._indexer_task = None

    async def start(self):
        env = self.env
        min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()
        self.log.info(f'software version: {lbry.__version__}')
        self.log.info(f'supported protocol versions: {min_str}-{max_str}')
        self.log.info(f'event loop policy: {env.loop_policy}')
        self.log.info(f'reorg limit is {env.reorg_limit:,d} blocks')

        await self.daemon.height()

        def _start_cancellable(run, *args):
            _flag = asyncio.Event()
            self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag)))
            return _flag.wait()

        await self.start_prometheus()
        if self.env.udp_port:
            await self.bp.status_server.start(
                0, bytes.fromhex(self.bp.coin.GENESIS_HASH)[::-1], self.env.country,
                self.env.host, self.env.udp_port, self.env.allow_lan_udp
            )
        await _start_cancellable(self.bp.fetch_and_process_blocks)

        await self.db.populate_header_merkle_cache()
        await _start_cancellable(self.bp.mempool.keep_synchronized)
        await _start_cancellable(self.session_mgr.serve, self.bp.mempool)

    async def stop(self):
        for task in reversed(self.cancellable_tasks):
            task.cancel()
        await asyncio.wait(self.cancellable_tasks)
        if self.prometheus_server:
            await self.prometheus_server.stop()
            self.prometheus_server = None
        self.shutdown_event.set()
        await self.daemon.close()

    def run(self):
        loop = asyncio.get_event_loop()
        executor = ThreadPoolExecutor(4)
        loop.set_default_executor(executor)

        def __exit():
            raise SystemExit()
        try:
            loop.add_signal_handler(signal.SIGINT, __exit)
            loop.add_signal_handler(signal.SIGTERM, __exit)
            loop.run_until_complete(self.start())
            loop.run_until_complete(self.shutdown_event.wait())
        except (SystemExit, KeyboardInterrupt):
            pass
        finally:
            loop.run_until_complete(self.stop())
            executor.shutdown(True)

    async def start_prometheus(self):
        if not self.prometheus_server and self.env.prometheus_port:
            self.prometheus_server = PrometheusServer()
            await self.prometheus_server.start("0.0.0.0", self.env.prometheus_port)