import logging
import socket
import functools
import hashlib
import asyncio
import typing
import random
from asyncio.protocols import DatagramProtocol
from asyncio.transports import DatagramTransport

from lbry.dht import constants
from lbry.dht.serialization.bencoding import DecodeError
from lbry.dht.serialization.datagram import decode_datagram, ErrorDatagram, ResponseDatagram, RequestDatagram
from lbry.dht.serialization.datagram import RESPONSE_TYPE, ERROR_TYPE, PAGE_KEY
from lbry.dht.error import RemoteException, TransportNotConnected
from lbry.dht.protocol.routing_table import TreeRoutingTable
from lbry.dht.protocol.data_store import DictDataStore
from lbry.dht.peer import make_kademlia_peer

if typing.TYPE_CHECKING:
    from lbry.dht.peer import PeerManager, KademliaPeer

log = logging.getLogger(__name__)


OLD_PROTOCOL_ERRORS = {
    "findNode() takes exactly 2 arguments (5 given)": "0.19.1",
    "findValue() takes exactly 2 arguments (5 given)": "0.19.1"
}


class KademliaRPC:
    def __init__(self, protocol: 'KademliaProtocol', loop: asyncio.AbstractEventLoop, peer_port: int = 3333):
        self.protocol = protocol
        self.loop = loop
        self.peer_port = peer_port
        self.old_token_secret: bytes = None
        self.token_secret = constants.generate_id()

    def compact_address(self):
        compact_ip = functools.reduce(lambda buff, x: buff + bytearray([int(x)]),
                                      self.protocol.external_ip.split('.'), bytearray())
        compact_port = self.peer_port.to_bytes(2, 'big')
        return compact_ip + compact_port + self.protocol.node_id

    @staticmethod
    def ping():
        return b'pong'

    def store(self, rpc_contact: 'KademliaPeer', blob_hash: bytes, token: bytes, port: int) -> bytes:
        if len(blob_hash) != constants.HASH_BITS // 8:
            raise ValueError(f"invalid length of blob hash: {len(blob_hash)}")
        if not 0 < port < 65535:
            raise ValueError(f"invalid tcp port: {port}")
        rpc_contact.update_tcp_port(port)
        if not self.verify_token(token, rpc_contact.compact_ip()):
            if self.loop.time() - self.protocol.started_listening_time < constants.TOKEN_SECRET_REFRESH_INTERVAL:
                pass
            else:
                raise ValueError("Invalid token")
        self.protocol.data_store.add_peer_to_blob(
            rpc_contact, blob_hash
        )
        return b'OK'

    def find_node(self, rpc_contact: 'KademliaPeer', key: bytes) -> typing.List[typing.Tuple[bytes, str, int]]:
        if len(key) != constants.HASH_LENGTH:
            raise ValueError("invalid contact node_id length: %i" % len(key))

        contacts = self.protocol.routing_table.find_close_peers(key, sender_node_id=rpc_contact.node_id)
        contact_triples = []
        for contact in contacts[:constants.K * 2]:
            contact_triples.append((contact.node_id, contact.address, contact.udp_port))
        return contact_triples

    def find_value(self, rpc_contact: 'KademliaPeer', key: bytes, page: int = 0):
        page = page if page > 0 else 0

        if len(key) != constants.HASH_LENGTH:
            raise ValueError("invalid blob_exchange hash length: %i" % len(key))

        response = {
            b'token': self.make_token(rpc_contact.compact_ip()),
        }

        if not page:
            response[b'contacts'] = self.find_node(rpc_contact, key)[:constants.K]

        if self.protocol.protocol_version:
            response[b'protocolVersion'] = self.protocol.protocol_version

        # get peers we have stored for this blob_exchange
        peers = [
            peer.compact_address_tcp()
            for peer in self.protocol.data_store.get_peers_for_blob(key)
            if not rpc_contact.tcp_port or peer.compact_address_tcp() != rpc_contact.compact_address_tcp()
        ]
        # if we don't have k storing peers to return and we have this hash locally, include our contact information
        if len(peers) < constants.K and key.hex() in self.protocol.data_store.completed_blobs:
            peers.append(self.compact_address())
        if not peers:
            response[PAGE_KEY] = 0
        else:
            response[PAGE_KEY] = (len(peers) // (constants.K + 1)) + 1  # how many pages of peers we have for the blob
        if len(peers) > constants.K:
            random.Random(self.protocol.node_id).shuffle(peers)
        if page * constants.K < len(peers):
            response[key] = peers[page * constants.K:page * constants.K + constants.K]
        return response

    def refresh_token(self):  # TODO: this needs to be called periodically
        self.old_token_secret = self.token_secret
        self.token_secret = constants.generate_id()

    def make_token(self, compact_ip):
        h = hashlib.new('sha384')
        h.update(self.token_secret + compact_ip)
        return h.digest()

    def verify_token(self, token, compact_ip):
        h = hashlib.new('sha384')
        h.update(self.token_secret + compact_ip)
        if self.old_token_secret and not token == h.digest():  # TODO: why should we be accepting the previous token?
            h = hashlib.new('sha384')
            h.update(self.old_token_secret + compact_ip)
            if not token == h.digest():
                return False
        return True


class RemoteKademliaRPC:
    """
    Encapsulates RPC calls to remote Peers
    """

    def __init__(self, loop: asyncio.AbstractEventLoop, peer_tracker: 'PeerManager', protocol: 'KademliaProtocol',
                 peer: 'KademliaPeer'):
        self.loop = loop
        self.peer_tracker = peer_tracker
        self.protocol = protocol
        self.peer = peer

    async def ping(self) -> bytes:
        """
        :return: b'pong'
        """
        response = await self.protocol.send_request(
            self.peer, RequestDatagram.make_ping(self.protocol.node_id)
        )
        return response.response

    async def store(self, blob_hash: bytes) -> bytes:
        """
        :param blob_hash: blob hash as bytes
        :return: b'OK'
        """
        if len(blob_hash) != constants.HASH_BITS // 8:
            raise ValueError(f"invalid length of blob hash: {len(blob_hash)}")
        if not self.protocol.peer_port or not 0 < self.protocol.peer_port < 65535:
            raise ValueError(f"invalid tcp port: {self.protocol.peer_port}")
        token = self.peer_tracker.get_node_token(self.peer.node_id)
        if not token:
            find_value_resp = await self.find_value(blob_hash)
            token = find_value_resp[b'token']
        response = await self.protocol.send_request(
            self.peer, RequestDatagram.make_store(self.protocol.node_id, blob_hash, token, self.protocol.peer_port)
        )
        return response.response

    async def find_node(self, key: bytes) -> typing.List[typing.Tuple[bytes, str, int]]:
        """
        :return: [(node_id, address, udp_port), ...]
        """
        if len(key) != constants.HASH_BITS // 8:
            raise ValueError(f"invalid length of find node key: {len(key)}")
        response = await self.protocol.send_request(
            self.peer, RequestDatagram.make_find_node(self.protocol.node_id, key)
        )
        return [(node_id, address.decode(), udp_port) for node_id, address, udp_port in response.response]

    async def find_value(self, key: bytes, page: int = 0) -> typing.Union[typing.Dict]:
        """
        :return: {
            b'token': <token bytes>,
            b'contacts': [(node_id, address, udp_port), ...]
            <key bytes>: [<blob_peer_compact_address, ...]
        }
        """
        if len(key) != constants.HASH_BITS // 8:
            raise ValueError(f"invalid length of find value key: {len(key)}")
        response = await self.protocol.send_request(
            self.peer, RequestDatagram.make_find_value(self.protocol.node_id, key, page=page)
        )
        self.peer_tracker.update_token(self.peer.node_id, response.response[b'token'])
        return response.response


class PingQueue:
    def __init__(self, loop: asyncio.AbstractEventLoop, protocol: 'KademliaProtocol'):
        self._loop = loop
        self._protocol = protocol
        self._pending_contacts: typing.Dict['KademliaPeer', float] = {}
        self._process_task: asyncio.Task = None
        self._running = False
        self._running_pings: typing.Set[asyncio.Task] = set()
        self._default_delay = constants.MAYBE_PING_DELAY

    @property
    def running(self):
        return self._running

    def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: typing.Optional[float] = None):
        delay = delay if delay is not None else self._default_delay
        now = self._loop.time()
        for peer in peers:
            if peer not in self._pending_contacts or now + delay < self._pending_contacts[peer]:
                self._pending_contacts[peer] = delay + now

    def maybe_ping(self, peer: 'KademliaPeer'):
        async def ping_task():
            try:
                if self._protocol.peer_manager.peer_is_good(peer):
                    if peer not in self._protocol.routing_table.get_peers():
                        self._protocol.add_peer(peer)
                    return
                await self._protocol.get_rpc_peer(peer).ping()
            except (asyncio.TimeoutError, RemoteException):
                pass

        task = self._loop.create_task(ping_task())
        task.add_done_callback(lambda _: None if task not in self._running_pings else self._running_pings.remove(task))
        self._running_pings.add(task)

    async def _process(self):  # send up to 1 ping per second
        while True:
            enqueued = list(self._pending_contacts.keys())
            now = self._loop.time()
            for peer in enqueued:
                if self._pending_contacts[peer] <= now:
                    del self._pending_contacts[peer]
                    self.maybe_ping(peer)
                    break
            await asyncio.sleep(1, loop=self._loop)

    def start(self):
        assert not self._running
        self._running = True
        if not self._process_task:
            self._process_task = self._loop.create_task(self._process())

    def stop(self):
        assert self._running
        self._running = False
        if self._process_task:
            self._process_task.cancel()
            self._process_task = None
        while self._running_pings:
            self._running_pings.pop().cancel()


class KademliaProtocol(DatagramProtocol):
    def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str,
                 udp_port: int, peer_port: int, rpc_timeout: float = constants.RPC_TIMEOUT,
                 split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX):
        self.peer_manager = peer_manager
        self.loop = loop
        self.node_id = node_id
        self.external_ip = external_ip
        self.udp_port = udp_port
        self.peer_port = peer_port
        self.is_seed_node = False
        self.partial_messages: typing.Dict[bytes, typing.Dict[bytes, bytes]] = {}
        self.sent_messages: typing.Dict[bytes, typing.Tuple['KademliaPeer', asyncio.Future, RequestDatagram]] = {}
        self.protocol_version = constants.PROTOCOL_VERSION
        self.started_listening_time = 0
        self.transport: DatagramTransport = None
        self.old_token_secret = constants.generate_id()
        self.token_secret = constants.generate_id()
        self.routing_table = TreeRoutingTable(self.loop, self.peer_manager, self.node_id, split_buckets_under_index)
        self.data_store = DictDataStore(self.loop, self.peer_manager)
        self.ping_queue = PingQueue(self.loop, self)
        self.node_rpc = KademliaRPC(self, self.loop, self.peer_port)
        self.rpc_timeout = rpc_timeout
        self._split_lock = asyncio.Lock(loop=self.loop)
        self._to_remove: typing.Set['KademliaPeer'] = set()
        self._to_add: typing.Set['KademliaPeer'] = set()
        self._wakeup_routing_task = asyncio.Event(loop=self.loop)
        self.maintaing_routing_task: typing.Optional[asyncio.Task] = None

    @functools.lru_cache(128)
    def get_rpc_peer(self, peer: 'KademliaPeer') -> RemoteKademliaRPC:
        return RemoteKademliaRPC(self.loop, self.peer_manager, self, peer)

    def start(self):
        self.maintaing_routing_task = self.loop.create_task(self.routing_table_task())

    def stop(self):
        if self.maintaing_routing_task:
            self.maintaing_routing_task.cancel()
        if self.transport:
            self.disconnect()

    def disconnect(self):
        self.transport.close()

    def connection_made(self, transport: DatagramTransport):
        self.transport = transport

    def connection_lost(self, exc):
        self.stop()

    @staticmethod
    def _migrate_incoming_rpc_args(peer: 'KademliaPeer', method: bytes, *args) -> typing.Tuple[typing.Tuple,
                                                                                               typing.Dict]:
        if method == b'store' and peer.protocol_version == 0:
            if isinstance(args[1], dict):
                blob_hash = args[0]
                token = args[1].pop(b'token', None)
                port = args[1].pop(b'port', -1)
                original_publisher_id = args[1].pop(b'lbryid', None)
                age = 0
                return (blob_hash, token, port, original_publisher_id, age), {}
        return args, {}

    async def _add_peer(self, peer: 'KademliaPeer'):
        if not peer.node_id:
            log.warning("Tried adding a peer with no node id!")
            return False
        for my_peer in self.routing_table.get_peers():
            if (my_peer.address, my_peer.udp_port) == (peer.address, peer.udp_port) and my_peer.node_id != peer.node_id:
                self.routing_table.remove_peer(my_peer)
                self.routing_table.join_buckets()
        bucket_index = self.routing_table.kbucket_index(peer.node_id)
        if self.routing_table.buckets[bucket_index].add_peer(peer):
            return True

        # The bucket is full; see if it can be split (by checking if its range includes the host node's node_id)
        if self.routing_table.should_split(bucket_index, peer.node_id):
            self.routing_table.split_bucket(bucket_index)
            # Retry the insertion attempt
            result = await self._add_peer(peer)
            self.routing_table.join_buckets()
            return result
        else:
            # We can't split the k-bucket
            #
            # The 13 page kademlia paper specifies that the least recently contacted node in the bucket
            # shall be pinged. If it fails to reply it is replaced with the new contact. If the ping is successful
            # the new contact is ignored and not added to the bucket (sections 2.2 and 2.4).
            #
            # A reasonable extension to this is BEP 0005, which extends the above:
            #
            #    Not all nodes that we learn about are equal. Some are "good" and some are not.
            #    Many nodes using the DHT are able to send queries and receive responses,
            #    but are not able to respond to queries from other nodes. It is important that
            #    each node's routing table must contain only known good nodes. A good node is
            #    a node has responded to one of our queries within the last 15 minutes. A node
            #    is also good if it has ever responded to one of our queries and has sent us a
            #    query within the last 15 minutes. After 15 minutes of inactivity, a node becomes
            #    questionable. Nodes become bad when they fail to respond to multiple queries
            #    in a row. Nodes that we know are good are given priority over nodes with unknown status.
            #
            # When there are bad or questionable nodes in the bucket, the least recent is selected for
            # potential replacement (BEP 0005). When all nodes in the bucket are fresh, the head (least recent)
            # contact is selected as described in section 2.2 of the kademlia paper. In both cases the new contact
            # is ignored if the pinged node replies.

            not_good_contacts = self.routing_table.buckets[bucket_index].get_bad_or_unknown_peers()
            not_recently_replied = []
            for my_peer in not_good_contacts:
                last_replied = self.peer_manager.get_last_replied(my_peer.address, my_peer.udp_port)
                if not last_replied or last_replied + 60 < self.loop.time():
                    not_recently_replied.append(my_peer)
            if not_recently_replied:
                to_replace = not_recently_replied[0]
            else:
                to_replace = self.routing_table.buckets[bucket_index].peers[0]
                last_replied = self.peer_manager.get_last_replied(to_replace.address, to_replace.udp_port)
                if last_replied and last_replied + 60 > self.loop.time():
                    return False
            log.debug("pinging %s:%s", to_replace.address, to_replace.udp_port)
            try:
                to_replace_rpc = self.get_rpc_peer(to_replace)
                await to_replace_rpc.ping()
                return False
            except asyncio.TimeoutError:
                log.debug("Replacing dead contact in bucket %i: %s:%i with %s:%i ", bucket_index,
                          to_replace.address, to_replace.udp_port, peer.address, peer.udp_port)
                if to_replace in self.routing_table.buckets[bucket_index]:
                    self.routing_table.buckets[bucket_index].remove_peer(to_replace)
                return await self._add_peer(peer)

    def add_peer(self, peer: 'KademliaPeer'):
        if peer.node_id == self.node_id:
            return False
        self._to_add.add(peer)
        self._wakeup_routing_task.set()

    def remove_peer(self, peer: 'KademliaPeer'):
        self._to_remove.add(peer)
        self._wakeup_routing_task.set()

    async def routing_table_task(self):
        while True:
            while self._to_remove:
                async with self._split_lock:
                    peer = self._to_remove.pop()
                    self.routing_table.remove_peer(peer)
                    self.routing_table.join_buckets()
            while self._to_add:
                async with self._split_lock:
                    await self._add_peer(self._to_add.pop())
            await asyncio.gather(self._wakeup_routing_task.wait(), asyncio.sleep(.1, loop=self.loop), loop=self.loop)
            self._wakeup_routing_task.clear()

    def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram):
        assert sender_contact.node_id != self.node_id, (sender_contact.node_id.hex()[:8],
                                                        self.node_id.hex()[:8])
        method = message.method
        if method not in [b'ping', b'store', b'findNode', b'findValue']:
            raise AttributeError('Invalid method: %s' % message.method.decode())
        if message.args and isinstance(message.args[-1], dict) and b'protocolVersion' in message.args[-1]:
            # args don't need reformatting
            args, kwargs = tuple(message.args[:-1]), message.args[-1]
        else:
            args, kwargs = self._migrate_incoming_rpc_args(sender_contact, message.method, *message.args)
        log.debug("%s:%i RECV CALL %s %s:%i", self.external_ip, self.udp_port, message.method.decode(),
                  sender_contact.address, sender_contact.udp_port)

        if method == b'ping':
            result = self.node_rpc.ping()
        elif method == b'store':
            blob_hash, token, port, original_publisher_id, age = args[:5]  # pylint: disable=unused-variable
            result = self.node_rpc.store(sender_contact, blob_hash, token, port)
        else:
            key = args[0]
            page = kwargs.get(PAGE_KEY, 0)
            if method == b'findNode':
                result = self.node_rpc.find_node(sender_contact, key)
            else:
                assert method == b'findValue'
                result = self.node_rpc.find_value(sender_contact, key, page)

        self.send_response(
            sender_contact, ResponseDatagram(RESPONSE_TYPE, message.rpc_id, self.node_id, result),
        )

    def handle_request_datagram(self, address: typing.Tuple[str, int], request_datagram: RequestDatagram):
        # This is an RPC method request
        self.peer_manager.report_last_requested(address[0], address[1])
        try:
            peer = self.routing_table.get_peer(request_datagram.node_id)
        except IndexError:
            try:
                peer = make_kademlia_peer(request_datagram.node_id, address[0], address[1])
            except ValueError as err:
                log.warning("error replying to %s: %s", address[0], str(err))
                return
        try:
            self._handle_rpc(peer, request_datagram)
            # if the contact is not known to be bad (yet) and we haven't yet queried it, send it a ping so that it
            # will be added to our routing table if successful
            is_good = self.peer_manager.peer_is_good(peer)
            if is_good is None:
                self.ping_queue.enqueue_maybe_ping(peer)
            # only add a requesting contact to the routing table if it has replied to one of our requests
            elif is_good is True:
                self.add_peer(peer)
        except ValueError as err:
            log.debug("error raised handling %s request from %s:%i - %s(%s)",
                      request_datagram.method, peer.address, peer.udp_port, str(type(err)),
                      str(err))
            self.send_error(
                peer,
                ErrorDatagram(ERROR_TYPE, request_datagram.rpc_id, self.node_id, str(type(err)).encode(),
                              str(err).encode())
            )
        except Exception as err:
            log.warning("error raised handling %s request from %s:%i - %s(%s)",
                        request_datagram.method, peer.address, peer.udp_port, str(type(err)),
                        str(err))
            self.send_error(
                peer,
                ErrorDatagram(ERROR_TYPE, request_datagram.rpc_id, self.node_id, str(type(err)).encode(),
                              str(err).encode())
            )

    def handle_response_datagram(self, address: typing.Tuple[str, int], response_datagram: ResponseDatagram):
        # Find the message that triggered this response
        if response_datagram.rpc_id in self.sent_messages:
            peer, future, _ = self.sent_messages[response_datagram.rpc_id]
            if peer.address != address[0]:
                future.set_exception(
                    RemoteException(f"response from {address[0]}, expected {peer.address}")
                )
                return

            # We got a result from the RPC
            if peer.node_id == self.node_id:
                future.set_exception(RemoteException("node has our node id"))
                return
            elif response_datagram.node_id == self.node_id:
                future.set_exception(RemoteException("incoming message is from our node id"))
                return
            peer = make_kademlia_peer(response_datagram.node_id, address[0], address[1])
            self.peer_manager.report_last_replied(address[0], address[1])
            self.peer_manager.update_contact_triple(peer.node_id, address[0], address[1])
            if not future.cancelled():
                future.set_result(response_datagram)
                self.add_peer(peer)
            else:
                log.warning("%s:%i replied, but after we cancelled the request attempt",
                            peer.address, peer.udp_port)
        else:
            # If the original message isn't found, it must have timed out
            # TODO: we should probably do something with this...
            pass

    def handle_error_datagram(self, address, error_datagram: ErrorDatagram):
        # The RPC request raised a remote exception; raise it locally
        remote_exception = RemoteException(f"{error_datagram.exception_type}({error_datagram.response})")
        if error_datagram.rpc_id in self.sent_messages:
            peer, future, request = self.sent_messages.pop(error_datagram.rpc_id)
            if (peer.address, peer.udp_port) != address:
                future.set_exception(
                    RemoteException(
                        f"response from {address[0]}:{address[1]}, "
                        f"expected {peer.address}:{peer.udp_port}"
                    )
                )
                return
            error_msg = f"" \
                f"Error sending '{request.method}' to {peer.address}:{peer.udp_port}\n" \
                f"Args: {request.args}\n" \
                f"Raised: {str(remote_exception)}"
            if 'Invalid token' in error_msg:
                log.debug(error_msg)
            elif error_datagram.response not in OLD_PROTOCOL_ERRORS:
                log.warning(error_msg)
            else:
                log.debug(
                    "known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)",
                    peer.address, peer.udp_port, OLD_PROTOCOL_ERRORS[error_datagram.response]
                )
            future.set_exception(remote_exception)
            return
        else:
            if error_datagram.response not in OLD_PROTOCOL_ERRORS:
                msg = f"Received error from {address[0]}:{address[1]}, but it isn't in response to a " \
                    f"pending request: {str(remote_exception)}"
                log.warning(msg)
            else:
                log.debug(
                    "known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)",
                    address[0], address[1], OLD_PROTOCOL_ERRORS[error_datagram.response]
                )

    def datagram_received(self, datagram: bytes, address: typing.Tuple[str, int]) -> None:  # pylint: disable=arguments-renamed
        try:
            message = decode_datagram(datagram)
        except (ValueError, TypeError, DecodeError):
            self.peer_manager.report_failure(address[0], address[1])
            log.warning("Couldn't decode dht datagram from %s: %s", address, datagram.hex())
            return

        if isinstance(message, RequestDatagram):
            self.handle_request_datagram(address, message)
        elif isinstance(message, ErrorDatagram):
            self.handle_error_datagram(address, message)
        else:
            assert isinstance(message, ResponseDatagram), "sanity"
            self.handle_response_datagram(address, message)

    async def send_request(self, peer: 'KademliaPeer', request: RequestDatagram) -> ResponseDatagram:
        self._send(peer, request)
        response_fut = self.sent_messages[request.rpc_id][1]
        try:
            response = await asyncio.wait_for(response_fut, self.rpc_timeout)
            self.peer_manager.report_last_replied(peer.address, peer.udp_port)
            return response
        except asyncio.CancelledError:
            if not response_fut.done():
                response_fut.cancel()
            raise
        except (asyncio.TimeoutError, RemoteException):
            self.peer_manager.report_failure(peer.address, peer.udp_port)
            if self.peer_manager.peer_is_good(peer) is False:
                self.remove_peer(peer)
            raise

    def send_response(self, peer: 'KademliaPeer', response: ResponseDatagram):
        self._send(peer, response)

    def send_error(self, peer: 'KademliaPeer', error: ErrorDatagram):
        self._send(peer, error)

    def _send(self, peer: 'KademliaPeer', message: typing.Union[RequestDatagram, ResponseDatagram, ErrorDatagram]):
        if not self.transport or self.transport.is_closing():
            raise TransportNotConnected()

        data = message.bencode()
        if len(data) > constants.MSG_SIZE_LIMIT:
            log.warning("cannot send datagram larger than %i bytes (packet is %i bytes)",
                        constants.MSG_SIZE_LIMIT, len(data))
            log.debug("Packet is too large to send: %s", data[:3500].hex())
            raise ValueError(
                f"cannot send datagram larger than {constants.MSG_SIZE_LIMIT} bytes (packet is {len(data)} bytes)"
            )
        if isinstance(message, (RequestDatagram, ResponseDatagram)):
            assert message.node_id == self.node_id, message
            if isinstance(message, RequestDatagram):
                assert self.node_id != peer.node_id

        def pop_from_sent_messages(_):
            if message.rpc_id in self.sent_messages:
                self.sent_messages.pop(message.rpc_id)

        if isinstance(message, RequestDatagram):
            response_fut = self.loop.create_future()
            response_fut.add_done_callback(pop_from_sent_messages)
            self.sent_messages[message.rpc_id] = (peer, response_fut, message)
        try:
            self.transport.sendto(data, (peer.address, peer.udp_port))
        except OSError as err:
            # TODO: handle ENETUNREACH
            if err.errno == socket.EWOULDBLOCK:
                # i'm scared this may swallow important errors, but i get a million of these
                # on Linux and it doesn't seem to affect anything  -grin
                log.warning("Can't send data to dht: EWOULDBLOCK")
            else:
                log.error("DHT socket error sending %i bytes to %s:%i - %s (code %i)",
                          len(data), peer.address, peer.udp_port, str(err), err.errno)
            if isinstance(message, RequestDatagram):
                self.sent_messages[message.rpc_id][1].set_exception(err)
            else:
                raise err
        if isinstance(message, RequestDatagram):
            self.peer_manager.report_last_sent(peer.address, peer.udp_port)
        elif isinstance(message, ErrorDatagram):
            self.peer_manager.report_failure(peer.address, peer.udp_port)

    def change_token(self):
        self.old_token_secret = self.token_secret
        self.token_secret = constants.generate_id()

    def make_token(self, compact_ip):
        return constants.digest(self.token_secret + compact_ip)

    def verify_token(self, token, compact_ip):
        h = constants.HASH_CLASS()
        h.update(self.token_secret + compact_ip)
        if self.old_token_secret and not token == h.digest():  # TODO: why should we be accepting the previous token?
            h = constants.HASH_CLASS()
            h.update(self.old_token_secret + compact_ip)
            if not token == h.digest():
                return False
        return True

    async def store_to_peer(self, hash_value: bytes, peer: 'KademliaPeer',  # pylint: disable=too-many-return-statements
                            retry: bool = True) -> typing.Tuple[bytes, bool]:
        async def __store():
            res = await self.get_rpc_peer(peer).store(hash_value)
            if res != b"OK":
                raise ValueError(res)
            log.debug("Stored %s to %s", hash_value.hex()[:8], peer)
            return peer.node_id, True

        try:
            return await __store()
        except asyncio.TimeoutError:
            log.debug("Timeout while storing blob_hash %s at %s", hash_value.hex()[:8], peer)
            return peer.node_id, False
        except ValueError as err:
            log.error("Unexpected response: %s", err)
            return peer.node_id, False
        except RemoteException as err:
            if 'findValue() takes exactly 2 arguments (5 given)' in str(err):
                log.debug("peer %s:%i is running an incompatible version of lbrynet", peer.address, peer.udp_port)
                return peer.node_id, False
            if 'Invalid token' not in str(err):
                log.warning("Unexpected error while storing blob_hash: %s", err)
                return peer.node_id, False
        self.peer_manager.clear_token(peer.node_id)
        if not retry:
            return peer.node_id, False
        return await self.store_to_peer(hash_value, peer, retry=False)