import asyncio
import binascii
import os
import logging
import random
from hashlib import sha1
from tempfile import mkdtemp
from typing import Optional

import libtorrent


NOTIFICATION_MASKS = [
    "error",
    "peer",
    "port_mapping",
    "storage",
    "tracker",
    "debug",
    "status",
    "progress",
    "ip_block",
    "dht",
    "stats",
    "session_log",
    "torrent_log",
    "peer_log",
    "incoming_request",
    "dht_log",
    "dht_operation",
    "port_mapping_log",
    "picker_log",
    "file_progress",
    "piece_progress",
    "upload",
    "block_progress"
]
log = logging.getLogger(__name__)


DEFAULT_FLAGS = (  # fixme: somehow the logic here is inverted?
        libtorrent.add_torrent_params_flags_t.flag_auto_managed
        | libtorrent.add_torrent_params_flags_t.flag_update_subscribe
)


def get_notification_type(notification) -> str:
    for i, notification_type in enumerate(NOTIFICATION_MASKS):
        if (1 << i) & notification:
            return notification_type
    raise ValueError("unrecognized notification type")


class TorrentHandle:
    def __init__(self, loop, executor, handle):
        self._loop = loop
        self._executor = executor
        self._handle: libtorrent.torrent_handle = handle
        self.started = asyncio.Event(loop=loop)
        self.finished = asyncio.Event(loop=loop)
        self.metadata_completed = asyncio.Event(loop=loop)
        self.size = 0
        self.total_wanted_done = 0
        self.name = ''
        self.tasks = []
        self.torrent_file: Optional[libtorrent.file_storage] = None
        self._base_path = None
        self._handle.set_sequential_download(1)

    @property
    def largest_file(self) -> Optional[str]:
        if not self.torrent_file:
            return None
        index = self.largest_file_index
        return os.path.join(self._base_path, self.torrent_file.at(index).path)

    @property
    def largest_file_index(self):
        largest_size, index = 0, 0
        for file_num in range(self.torrent_file.num_files()):
            if self.torrent_file.file_size(file_num) > largest_size:
                largest_size = self.torrent_file.file_size(file_num)
                index = file_num
        return index

    def stop_tasks(self):
        while self.tasks:
            self.tasks.pop().cancel()

    def _show_status(self):
        # fixme: cleanup
        if not self._handle.is_valid():
            return
        status = self._handle.status()
        if status.has_metadata:
            self.size = status.total_wanted
            self.total_wanted_done = status.total_wanted_done
            self.name = status.name
            if not self.metadata_completed.is_set():
                self.metadata_completed.set()
                log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name)
                self.torrent_file = self._handle.get_torrent_info().files()
                self._base_path = status.save_path
            first_piece = self.torrent_file.at(self.largest_file_index).offset
            if not self.started.is_set():
                if self._handle.have_piece(first_piece):
                    self.started.set()
                else:
                    # prioritize it
                    self._handle.set_piece_deadline(first_piece, 100)
        if not status.is_seeding:
            log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s',
                      status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000,
                      status.num_peers, status.num_seeds, status.state, status.save_path)
        elif not self.finished.is_set():
            self.finished.set()
            log.info("Torrent finished: %s", self.name)

    async def status_loop(self):
        while True:
            self._show_status()
            if self.finished.is_set():
                break
            await asyncio.sleep(0.1, loop=self._loop)

    async def pause(self):
        await self._loop.run_in_executor(
            self._executor, self._handle.pause
        )

    async def resume(self):
        await self._loop.run_in_executor(
            self._executor, lambda: self._handle.resume()  # pylint: disable=unnecessary-lambda
        )


class TorrentSession:
    def __init__(self, loop, executor):
        self._loop = loop
        self._executor = executor
        self._session: Optional[libtorrent.session] = None
        self._handles = {}
        self.tasks = []
        self.wait_start = True

    async def add_fake_torrent(self):
        tmpdir = mkdtemp()
        info, btih = _create_fake_torrent(tmpdir)
        flags = libtorrent.add_torrent_params_flags_t.flag_seed_mode
        handle = self._session.add_torrent({
            'ti': info, 'save_path': tmpdir, 'flags': flags
        })
        self._handles[btih] = TorrentHandle(self._loop, self._executor, handle)
        return btih

    async def bind(self, interface: str = '0.0.0.0', port: int = 10889):
        settings = {
            'listen_interfaces': f"{interface}:{port}",
            'enable_outgoing_utp': True,
            'enable_incoming_utp': True,
            'enable_outgoing_tcp': False,
            'enable_incoming_tcp': False
        }
        self._session = await self._loop.run_in_executor(
            self._executor, libtorrent.session, settings  # pylint: disable=c-extension-no-member
        )
        self.tasks.append(self._loop.create_task(self.process_alerts()))

    def stop(self):
        while self.tasks:
            self.tasks.pop().cancel()
        self._session.save_state()
        self._session.pause()
        self._session.stop_dht()
        self._session.stop_lsd()
        self._session.stop_natpmp()
        self._session.stop_upnp()
        self._session = None

    def _pop_alerts(self):
        for alert in self._session.pop_alerts():
            log.info("torrent alert: %s", alert)

    async def process_alerts(self):
        while True:
            await self._loop.run_in_executor(
                self._executor, self._pop_alerts
            )
            await asyncio.sleep(1, loop=self._loop)

    async def pause(self):
        await self._loop.run_in_executor(
            self._executor, lambda: self._session.save_state()  # pylint: disable=unnecessary-lambda
        )
        await self._loop.run_in_executor(
            self._executor, lambda: self._session.pause()  # pylint: disable=unnecessary-lambda
        )

    async def resume(self):
        await self._loop.run_in_executor(
            self._executor, self._session.resume
        )

    def _add_torrent(self, btih: str, download_directory: Optional[str]):
        params = {'info_hash': binascii.unhexlify(btih.encode()), 'flags': DEFAULT_FLAGS}
        if download_directory:
            params['save_path'] = download_directory
        handle = self._session.add_torrent(params)
        handle.force_dht_announce()
        self._handles[btih] = TorrentHandle(self._loop, self._executor, handle)

    def full_path(self, btih):
        return self._handles[btih].largest_file

    async def add_torrent(self, btih, download_path):
        await self._loop.run_in_executor(
            self._executor, self._add_torrent, btih, download_path
        )
        self._handles[btih].tasks.append(self._loop.create_task(self._handles[btih].status_loop()))
        await self._handles[btih].metadata_completed.wait()
        if self.wait_start:
            # fixme: temporary until we add streaming support, otherwise playback fails!
            await self._handles[btih].started.wait()

    def remove_torrent(self, btih, remove_files=False):
        if btih in self._handles:
            handle = self._handles[btih]
            handle.stop_tasks()
            self._session.remove_torrent(handle._handle, 1 if remove_files else 0)
            self._handles.pop(btih)

    async def save_file(self, btih, download_directory):
        handle = self._handles[btih]
        await handle.resume()

    def get_size(self, btih):
        return self._handles[btih].size

    def get_name(self, btih):
        return self._handles[btih].name

    def get_downloaded(self, btih):
        return self._handles[btih].total_wanted_done

    def is_completed(self, btih):
        return self._handles[btih].finished.is_set()


def get_magnet_uri(btih):
    return f"magnet:?xt=urn:btih:{btih}"


def _create_fake_torrent(tmpdir):
    # beware, that's just for testing
    path = os.path.join(tmpdir, 'tmp')
    with open(path, 'wb') as myfile:
        size = myfile.write(bytes([random.randint(0, 255) for _ in range(40)]) * 1024)
    file_storage = libtorrent.file_storage()
    file_storage.add_file('tmp', size)
    t = libtorrent.create_torrent(file_storage, 0, 4 * 1024 * 1024)
    libtorrent.set_piece_hashes(t, tmpdir)
    info = libtorrent.torrent_info(t.generate())
    btih = sha1(info.metadata()).hexdigest()
    return info, btih


async def main():
    if os.path.exists("~/Downloads/ubuntu-18.04.3-live-server-amd64.torrent"):
        os.remove("~/Downloads/ubuntu-18.04.3-live-server-amd64.torrent")
    if os.path.exists("~/Downloads/ubuntu-18.04.3-live-server-amd64.iso"):
        os.remove("~/Downloads/ubuntu-18.04.3-live-server-amd64.iso")

    btih = "dd8255ecdc7ca55fb0bbf81323d87062db1f6d1c"

    executor = None
    session = TorrentSession(asyncio.get_event_loop(), executor)
    session2 = TorrentSession(asyncio.get_event_loop(), executor)
    await session.bind('localhost', port=4040)
    await session2.bind('localhost', port=4041)
    btih = await session.add_fake_torrent()
    session2._session.add_dht_node(('localhost', 4040))
    await session2.add_torrent(btih, "/tmp/down")
    while True:
        await asyncio.sleep(100)
    await session.pause()
    executor.shutdown()


if __name__ == "__main__":
    asyncio.run(main())