import base64
import codecs
import datetime
import random
import socket
import string
import json
import typing
import asyncio
import ssl
import logging
import ipaddress
import contextlib
import functools
import collections
import hashlib
import pkg_resources

import certifi
import aiohttp
from lbry.schema.claim import Claim

log = logging.getLogger(__name__)

# defining these time functions here allows for easier overriding in testing
def now():

def utcnow():
    return datetime.datetime.utcnow()

def isonow():
    """Return utc now in isoformat with timezone"""
    return utcnow().isoformat() + 'Z'

def today():

def timedelta(**kwargs):
    return datetime.timedelta(**kwargs)

def datetime_obj(*args, **kwargs):
    return datetime.datetime(*args, **kwargs)

def get_lbry_hash_obj():
    return hashlib.sha384()

def generate_id(num=None):
    h = get_lbry_hash_obj()
    if num is not None:
    return h.digest()

def version_is_greater_than(version_a, version_b):
    """Returns True if version a is more recent than version b"""
    return pkg_resources.parse_version(version_a) > pkg_resources.parse_version(version_b)

def rot13(some_str):
    return codecs.encode(some_str, 'rot_13')

def deobfuscate(obfustacated):
    return base64.b64decode(rot13(obfustacated)).decode()

def obfuscate(plain):
    return rot13(base64.b64encode(plain).decode())

def check_connection(server="", port=80, timeout=5) -> bool:
    """Attempts to open a socket to server:port and returns True if successful."""
    log.debug('Checking connection to %s:%s', server, port)
        server = socket.gethostbyname(server)
        socket.create_connection((server, port), timeout).close()
        return True
    except (socket.gaierror, socket.herror):
        log.debug("Failed to connect to %s:%s. Unable to resolve domain. Trying to bypass DNS",
                  server, port)
            server = ""
            port = 53
            socket.create_connection((server, port), timeout).close()
            return True
        except OSError:
            return False
    except OSError:
        return False

async def async_check_connection(server="", port=80, timeout=1) -> bool:
    return await asyncio.get_event_loop().run_in_executor(None, check_connection, server, port, timeout)

def random_string(length=10, chars=string.ascii_lowercase):
    return ''.join([random.choice(chars) for _ in range(length)])

def short_hash(hash_str):
    return hash_str[:6]

def get_sd_hash(stream_info):
    if not stream_info:
        return None
    if isinstance(stream_info, Claim):
    result = stream_info.get('claim', {}).\
        get('value', {}).\
        get('stream', {}).\
        get('source', {}).\
    if not result:
        log.warning("Unable to get sd_hash")
    return result

def json_dumps_pretty(obj, **kwargs):
    return json.dumps(obj, sort_keys=True, indent=2, separators=(',', ': '), **kwargs)

def cancel_task(task: typing.Optional[asyncio.Task]):
    if task and not task.done():

def cancel_tasks(tasks: typing.List[typing.Optional[asyncio.Task]]):
    for task in tasks:

def drain_tasks(tasks: typing.List[typing.Optional[asyncio.Task]]):
    while tasks:

def async_timed_cache(duration: int):
    def wrapper(func):
        cache: typing.Dict[typing.Tuple,
                           typing.Tuple[typing.Any, float]] = {}

        async def _inner(*args, **kwargs) -> typing.Any:
            loop = asyncio.get_running_loop()
            time_now = loop.time()
            key = tuple([args, tuple([tuple([k, kwargs[k]]) for k in kwargs])])
            if key in cache and (time_now - cache[key][1] < duration):
                return cache[key][0]
            to_cache = await func(*args, **kwargs)
            cache[key] = to_cache, time_now
            return to_cache
        return _inner
    return wrapper

def cache_concurrent(async_fn):
    When the decorated function has concurrent calls made to it with the same arguments, only run it once
    cache: typing.Dict = {}

    async def wrapper(*args, **kwargs):
        key = tuple([args, tuple([tuple([k, kwargs[k]]) for k in kwargs])])
        cache[key] = cache.get(key) or asyncio.create_task(async_fn(*args, **kwargs))
            return await cache[key]
            cache.pop(key, None)

    return wrapper

async def resolve_host(url: str, port: int, proto: str) -> str:
    if proto not in ['udp', 'tcp']:
        raise Exception("invalid protocol")
        if ipaddress.ip_address(url):
            return url
    except ValueError:
    loop = asyncio.get_running_loop()
    return (await loop.getaddrinfo(
        url, port,
        proto=socket.IPPROTO_TCP if proto == 'tcp' else socket.IPPROTO_UDP,
        type=socket.SOCK_STREAM if proto == 'tcp' else socket.SOCK_DGRAM,

class LRUCache:
    __slots__ = [

    def __init__(self, capacity):
        self.capacity = capacity
        self.cache = collections.OrderedDict()

    def get(self, key):
        value = self.cache.pop(key)
        self.cache[key] = value
        return value

    def set(self, key, value):
        except KeyError:
            if len(self.cache) >= self.capacity:
        self.cache[key] = value

    def __contains__(self, item) -> bool:
        return item in self.cache

def lru_cache_concurrent(cache_size: typing.Optional[int] = None,
                         override_lru_cache: typing.Optional[LRUCache] = None):
    if not cache_size and override_lru_cache is None:
        raise ValueError("invalid cache size")
    concurrent_cache = {}
    lru_cache = override_lru_cache or LRUCache(cache_size)

    def wrapper(async_fn):

        async def _inner(*args, **kwargs):
            key = tuple([args, tuple([tuple([k, kwargs[k]]) for k in kwargs])])
            if key in lru_cache:
                return lru_cache.get(key)

            concurrent_cache[key] = concurrent_cache.get(key) or asyncio.create_task(async_fn(*args, **kwargs))

                result = await concurrent_cache[key]
                lru_cache.set(key, result)
                return result
                concurrent_cache.pop(key, None)
        return _inner
    return wrapper

def get_ssl_context() -> ssl.SSLContext:
    return ssl.create_default_context(
        purpose=ssl.Purpose.CLIENT_AUTH, capath=certifi.where()

async def aiohttp_request(method, url, **kwargs) -> typing.AsyncContextManager[aiohttp.ClientResponse]:
    async with aiohttp.ClientSession() as session:
        async with session.request(method, url, ssl=get_ssl_context(), **kwargs) as response:
            yield response

async def get_external_ip() -> typing.Optional[str]:  # used if upnp is disabled or non-functioning
        async with aiohttp_request("get", "") as resp:
            response = await resp.json()
            if response['success']:
                return response['data']['ip']
    except Exception: