720 lines
33 KiB
Python
720 lines
33 KiB
Python
import logging
|
|
import socket
|
|
import functools
|
|
import hashlib
|
|
import asyncio
|
|
import time
|
|
import typing
|
|
import random
|
|
from asyncio.protocols import DatagramProtocol
|
|
from asyncio.transports import DatagramTransport
|
|
|
|
from prometheus_client import Gauge, Counter, Histogram
|
|
|
|
from lbry.dht import constants
|
|
from lbry.dht.serialization.bencoding import DecodeError
|
|
from lbry.dht.serialization.datagram import decode_datagram, ErrorDatagram, ResponseDatagram, RequestDatagram
|
|
from lbry.dht.serialization.datagram import RESPONSE_TYPE, ERROR_TYPE, PAGE_KEY
|
|
from lbry.dht.error import RemoteException, TransportNotConnected
|
|
from lbry.dht.protocol.routing_table import TreeRoutingTable
|
|
from lbry.dht.protocol.data_store import DictDataStore
|
|
from lbry.dht.peer import make_kademlia_peer
|
|
|
|
if typing.TYPE_CHECKING:
|
|
from lbry.dht.peer import PeerManager, KademliaPeer
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
OLD_PROTOCOL_ERRORS = {
|
|
"findNode() takes exactly 2 arguments (5 given)": "0.19.1",
|
|
"findValue() takes exactly 2 arguments (5 given)": "0.19.1"
|
|
}
|
|
|
|
|
|
class KademliaRPC:
|
|
stored_blob_metric = Gauge(
|
|
"stored_blobs", "Number of blobs announced by other peers", namespace="dht_node",
|
|
labelnames=("scope",),
|
|
)
|
|
|
|
def __init__(self, protocol: 'KademliaProtocol', loop: asyncio.AbstractEventLoop, peer_port: int = 3333):
|
|
self.protocol = protocol
|
|
self.loop = loop
|
|
self.peer_port = peer_port
|
|
self.old_token_secret: bytes = None
|
|
self.token_secret = constants.generate_id()
|
|
|
|
def compact_address(self):
|
|
compact_ip = functools.reduce(lambda buff, x: buff + bytearray([int(x)]),
|
|
self.protocol.external_ip.split('.'), bytearray())
|
|
compact_port = self.peer_port.to_bytes(2, 'big')
|
|
return compact_ip + compact_port + self.protocol.node_id
|
|
|
|
@staticmethod
|
|
def ping():
|
|
return b'pong'
|
|
|
|
def store(self, rpc_contact: 'KademliaPeer', blob_hash: bytes, token: bytes, port: int) -> bytes:
|
|
if len(blob_hash) != constants.HASH_BITS // 8:
|
|
raise ValueError(f"invalid length of blob hash: {len(blob_hash)}")
|
|
if not 0 < port < 65535:
|
|
raise ValueError(f"invalid tcp port: {port}")
|
|
rpc_contact.update_tcp_port(port)
|
|
if not self.verify_token(token, rpc_contact.compact_ip()):
|
|
if self.loop.time() - self.protocol.started_listening_time < constants.TOKEN_SECRET_REFRESH_INTERVAL:
|
|
pass
|
|
else:
|
|
raise ValueError("Invalid token")
|
|
self.protocol.data_store.add_peer_to_blob(
|
|
rpc_contact, blob_hash
|
|
)
|
|
self.stored_blob_metric.labels("global").set(len(self.protocol.data_store))
|
|
return b'OK'
|
|
|
|
def find_node(self, rpc_contact: 'KademliaPeer', key: bytes) -> typing.List[typing.Tuple[bytes, str, int]]:
|
|
if len(key) != constants.HASH_LENGTH:
|
|
raise ValueError("invalid contact node_id length: %i" % len(key))
|
|
|
|
contacts = self.protocol.routing_table.find_close_peers(key, sender_node_id=rpc_contact.node_id)
|
|
contact_triples = []
|
|
for contact in contacts[:constants.K * 2]:
|
|
contact_triples.append((contact.node_id, contact.address, contact.udp_port))
|
|
return contact_triples
|
|
|
|
def find_value(self, rpc_contact: 'KademliaPeer', key: bytes, page: int = 0):
|
|
page = page if page > 0 else 0
|
|
|
|
if len(key) != constants.HASH_LENGTH:
|
|
raise ValueError("invalid blob_exchange hash length: %i" % len(key))
|
|
|
|
response = {
|
|
b'token': self.make_token(rpc_contact.compact_ip()),
|
|
}
|
|
|
|
if not page:
|
|
response[b'contacts'] = self.find_node(rpc_contact, key)[:constants.K]
|
|
|
|
if self.protocol.protocol_version:
|
|
response[b'protocolVersion'] = self.protocol.protocol_version
|
|
|
|
# get peers we have stored for this blob_exchange
|
|
peers = [
|
|
peer.compact_address_tcp()
|
|
for peer in self.protocol.data_store.get_peers_for_blob(key)
|
|
if not rpc_contact.tcp_port or peer.compact_address_tcp() != rpc_contact.compact_address_tcp()
|
|
]
|
|
# if we don't have k storing peers to return and we have this hash locally, include our contact information
|
|
if len(peers) < constants.K and key.hex() in self.protocol.data_store.completed_blobs:
|
|
peers.append(self.compact_address())
|
|
if not peers:
|
|
response[PAGE_KEY] = 0
|
|
else:
|
|
response[PAGE_KEY] = (len(peers) // (constants.K + 1)) + 1 # how many pages of peers we have for the blob
|
|
if len(peers) > constants.K:
|
|
random.Random(self.protocol.node_id).shuffle(peers)
|
|
if page * constants.K < len(peers):
|
|
response[key] = peers[page * constants.K:page * constants.K + constants.K]
|
|
return response
|
|
|
|
def refresh_token(self): # TODO: this needs to be called periodically
|
|
self.old_token_secret = self.token_secret
|
|
self.token_secret = constants.generate_id()
|
|
|
|
def make_token(self, compact_ip):
|
|
h = hashlib.new('sha384')
|
|
h.update(self.token_secret + compact_ip)
|
|
return h.digest()
|
|
|
|
def verify_token(self, token, compact_ip):
|
|
h = hashlib.new('sha384')
|
|
h.update(self.token_secret + compact_ip)
|
|
if self.old_token_secret and not token == h.digest(): # TODO: why should we be accepting the previous token?
|
|
h = hashlib.new('sha384')
|
|
h.update(self.old_token_secret + compact_ip)
|
|
if not token == h.digest():
|
|
return False
|
|
return True
|
|
|
|
|
|
class RemoteKademliaRPC:
|
|
"""
|
|
Encapsulates RPC calls to remote Peers
|
|
"""
|
|
|
|
def __init__(self, loop: asyncio.AbstractEventLoop, peer_tracker: 'PeerManager', protocol: 'KademliaProtocol',
|
|
peer: 'KademliaPeer'):
|
|
self.loop = loop
|
|
self.peer_tracker = peer_tracker
|
|
self.protocol = protocol
|
|
self.peer = peer
|
|
|
|
async def ping(self) -> bytes:
|
|
"""
|
|
:return: b'pong'
|
|
"""
|
|
response = await self.protocol.send_request(
|
|
self.peer, RequestDatagram.make_ping(self.protocol.node_id)
|
|
)
|
|
return response.response
|
|
|
|
async def store(self, blob_hash: bytes) -> bytes:
|
|
"""
|
|
:param blob_hash: blob hash as bytes
|
|
:return: b'OK'
|
|
"""
|
|
if len(blob_hash) != constants.HASH_BITS // 8:
|
|
raise ValueError(f"invalid length of blob hash: {len(blob_hash)}")
|
|
if not self.protocol.peer_port or not 0 < self.protocol.peer_port < 65535:
|
|
raise ValueError(f"invalid tcp port: {self.protocol.peer_port}")
|
|
token = self.peer_tracker.get_node_token(self.peer.node_id)
|
|
if not token:
|
|
find_value_resp = await self.find_value(blob_hash)
|
|
token = find_value_resp[b'token']
|
|
response = await self.protocol.send_request(
|
|
self.peer, RequestDatagram.make_store(self.protocol.node_id, blob_hash, token, self.protocol.peer_port)
|
|
)
|
|
return response.response
|
|
|
|
async def find_node(self, key: bytes) -> typing.List[typing.Tuple[bytes, str, int]]:
|
|
"""
|
|
:return: [(node_id, address, udp_port), ...]
|
|
"""
|
|
if len(key) != constants.HASH_BITS // 8:
|
|
raise ValueError(f"invalid length of find node key: {len(key)}")
|
|
response = await self.protocol.send_request(
|
|
self.peer, RequestDatagram.make_find_node(self.protocol.node_id, key)
|
|
)
|
|
return [(node_id, address.decode(), udp_port) for node_id, address, udp_port in response.response]
|
|
|
|
async def find_value(self, key: bytes, page: int = 0) -> typing.Union[typing.Dict]:
|
|
"""
|
|
:return: {
|
|
b'token': <token bytes>,
|
|
b'contacts': [(node_id, address, udp_port), ...]
|
|
<key bytes>: [<blob_peer_compact_address, ...]
|
|
}
|
|
"""
|
|
if len(key) != constants.HASH_BITS // 8:
|
|
raise ValueError(f"invalid length of find value key: {len(key)}")
|
|
response = await self.protocol.send_request(
|
|
self.peer, RequestDatagram.make_find_value(self.protocol.node_id, key, page=page)
|
|
)
|
|
self.peer_tracker.update_token(self.peer.node_id, response.response[b'token'])
|
|
return response.response
|
|
|
|
|
|
class PingQueue:
|
|
def __init__(self, loop: asyncio.AbstractEventLoop, protocol: 'KademliaProtocol'):
|
|
self._loop = loop
|
|
self._protocol = protocol
|
|
self._pending_contacts: typing.Dict['KademliaPeer', float] = {}
|
|
self._process_task: asyncio.Task = None
|
|
self._running = False
|
|
self._running_pings: typing.Set[asyncio.Task] = set()
|
|
self._default_delay = constants.MAYBE_PING_DELAY
|
|
|
|
@property
|
|
def running(self):
|
|
return self._running
|
|
|
|
def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: typing.Optional[float] = None):
|
|
delay = delay if delay is not None else self._default_delay
|
|
now = self._loop.time()
|
|
for peer in peers:
|
|
if peer not in self._pending_contacts or now + delay < self._pending_contacts[peer]:
|
|
self._pending_contacts[peer] = delay + now
|
|
|
|
def maybe_ping(self, peer: 'KademliaPeer'):
|
|
async def ping_task():
|
|
try:
|
|
if self._protocol.peer_manager.peer_is_good(peer):
|
|
if peer not in self._protocol.routing_table.get_peers():
|
|
self._protocol.add_peer(peer)
|
|
return
|
|
await self._protocol.get_rpc_peer(peer).ping()
|
|
except (asyncio.TimeoutError, RemoteException):
|
|
pass
|
|
|
|
task = self._loop.create_task(ping_task())
|
|
task.add_done_callback(lambda _: None if task not in self._running_pings else self._running_pings.remove(task))
|
|
self._running_pings.add(task)
|
|
|
|
async def _process(self): # send up to 1 ping per second
|
|
while True:
|
|
enqueued = list(self._pending_contacts.keys())
|
|
now = self._loop.time()
|
|
for peer in enqueued:
|
|
if self._pending_contacts[peer] <= now:
|
|
del self._pending_contacts[peer]
|
|
self.maybe_ping(peer)
|
|
break
|
|
await asyncio.sleep(1, loop=self._loop)
|
|
|
|
def start(self):
|
|
assert not self._running
|
|
self._running = True
|
|
if not self._process_task:
|
|
self._process_task = self._loop.create_task(self._process())
|
|
|
|
def stop(self):
|
|
assert self._running
|
|
self._running = False
|
|
if self._process_task:
|
|
self._process_task.cancel()
|
|
self._process_task = None
|
|
while self._running_pings:
|
|
self._running_pings.pop().cancel()
|
|
|
|
|
|
class KademliaProtocol(DatagramProtocol):
|
|
request_sent_metric = Counter(
|
|
"request_sent", "Number of requests send from DHT RPC protocol", namespace="dht_node",
|
|
labelnames=("method",),
|
|
)
|
|
request_success_metric = Counter(
|
|
"request_success", "Number of successful requests", namespace="dht_node",
|
|
labelnames=("method",),
|
|
)
|
|
HISTOGRAM_BUCKETS = (
|
|
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 3.0, 3.5, 4.0, 4.50, 5.0, 5.50, 6.0, float('inf')
|
|
)
|
|
response_time_metric = Histogram(
|
|
"response_time", "Response times of DHT RPC requests", namespace="dht_node", buckets=HISTOGRAM_BUCKETS,
|
|
labelnames=("method",)
|
|
)
|
|
received_request_metric = Counter(
|
|
"received_request", "Number of received DHT RPC requests", namespace="dht_node",
|
|
labelnames=("method",),
|
|
)
|
|
|
|
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str,
|
|
udp_port: int, peer_port: int, rpc_timeout: float = constants.RPC_TIMEOUT,
|
|
split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX):
|
|
self.peer_manager = peer_manager
|
|
self.loop = loop
|
|
self.node_id = node_id
|
|
self.external_ip = external_ip
|
|
self.udp_port = udp_port
|
|
self.peer_port = peer_port
|
|
self.is_seed_node = False
|
|
self.partial_messages: typing.Dict[bytes, typing.Dict[bytes, bytes]] = {}
|
|
self.sent_messages: typing.Dict[bytes, typing.Tuple['KademliaPeer', asyncio.Future, RequestDatagram]] = {}
|
|
self.protocol_version = constants.PROTOCOL_VERSION
|
|
self.started_listening_time = 0
|
|
self.transport: DatagramTransport = None
|
|
self.old_token_secret = constants.generate_id()
|
|
self.token_secret = constants.generate_id()
|
|
self.routing_table = TreeRoutingTable(self.loop, self.peer_manager, self.node_id, split_buckets_under_index)
|
|
self.data_store = DictDataStore(self.loop, self.peer_manager)
|
|
self.ping_queue = PingQueue(self.loop, self)
|
|
self.node_rpc = KademliaRPC(self, self.loop, self.peer_port)
|
|
self.rpc_timeout = rpc_timeout
|
|
self._split_lock = asyncio.Lock(loop=self.loop)
|
|
self._to_remove: typing.Set['KademliaPeer'] = set()
|
|
self._to_add: typing.Set['KademliaPeer'] = set()
|
|
self._wakeup_routing_task = asyncio.Event(loop=self.loop)
|
|
self.maintaing_routing_task: typing.Optional[asyncio.Task] = None
|
|
|
|
@functools.lru_cache(128)
|
|
def get_rpc_peer(self, peer: 'KademliaPeer') -> RemoteKademliaRPC:
|
|
return RemoteKademliaRPC(self.loop, self.peer_manager, self, peer)
|
|
|
|
def start(self):
|
|
self.maintaing_routing_task = self.loop.create_task(self.routing_table_task())
|
|
|
|
def stop(self):
|
|
if self.maintaing_routing_task:
|
|
self.maintaing_routing_task.cancel()
|
|
if self.transport:
|
|
self.disconnect()
|
|
|
|
def disconnect(self):
|
|
self.transport.close()
|
|
|
|
def connection_made(self, transport: DatagramTransport):
|
|
self.transport = transport
|
|
|
|
def connection_lost(self, exc):
|
|
self.stop()
|
|
|
|
@staticmethod
|
|
def _migrate_incoming_rpc_args(peer: 'KademliaPeer', method: bytes, *args) -> typing.Tuple[typing.Tuple,
|
|
typing.Dict]:
|
|
if method == b'store' and peer.protocol_version == 0:
|
|
if isinstance(args[1], dict):
|
|
blob_hash = args[0]
|
|
token = args[1].pop(b'token', None)
|
|
port = args[1].pop(b'port', -1)
|
|
original_publisher_id = args[1].pop(b'lbryid', None)
|
|
age = 0
|
|
return (blob_hash, token, port, original_publisher_id, age), {}
|
|
return args, {}
|
|
|
|
async def _add_peer(self, peer: 'KademliaPeer'):
|
|
if not peer.node_id:
|
|
log.warning("Tried adding a peer with no node id!")
|
|
return False
|
|
for my_peer in self.routing_table.get_peers():
|
|
if (my_peer.address, my_peer.udp_port) == (peer.address, peer.udp_port) and my_peer.node_id != peer.node_id:
|
|
self.routing_table.remove_peer(my_peer)
|
|
self.routing_table.join_buckets()
|
|
bucket_index = self.routing_table.kbucket_index(peer.node_id)
|
|
if self.routing_table.buckets[bucket_index].add_peer(peer):
|
|
return True
|
|
|
|
# The bucket is full; see if it can be split (by checking if its range includes the host node's node_id)
|
|
if self.routing_table.should_split(bucket_index, peer.node_id):
|
|
self.routing_table.split_bucket(bucket_index)
|
|
# Retry the insertion attempt
|
|
result = await self._add_peer(peer)
|
|
self.routing_table.join_buckets()
|
|
return result
|
|
else:
|
|
# We can't split the k-bucket
|
|
#
|
|
# The 13 page kademlia paper specifies that the least recently contacted node in the bucket
|
|
# shall be pinged. If it fails to reply it is replaced with the new contact. If the ping is successful
|
|
# the new contact is ignored and not added to the bucket (sections 2.2 and 2.4).
|
|
#
|
|
# A reasonable extension to this is BEP 0005, which extends the above:
|
|
#
|
|
# Not all nodes that we learn about are equal. Some are "good" and some are not.
|
|
# Many nodes using the DHT are able to send queries and receive responses,
|
|
# but are not able to respond to queries from other nodes. It is important that
|
|
# each node's routing table must contain only known good nodes. A good node is
|
|
# a node has responded to one of our queries within the last 15 minutes. A node
|
|
# is also good if it has ever responded to one of our queries and has sent us a
|
|
# query within the last 15 minutes. After 15 minutes of inactivity, a node becomes
|
|
# questionable. Nodes become bad when they fail to respond to multiple queries
|
|
# in a row. Nodes that we know are good are given priority over nodes with unknown status.
|
|
#
|
|
# When there are bad or questionable nodes in the bucket, the least recent is selected for
|
|
# potential replacement (BEP 0005). When all nodes in the bucket are fresh, the head (least recent)
|
|
# contact is selected as described in section 2.2 of the kademlia paper. In both cases the new contact
|
|
# is ignored if the pinged node replies.
|
|
|
|
not_good_contacts = self.routing_table.buckets[bucket_index].get_bad_or_unknown_peers()
|
|
not_recently_replied = []
|
|
for my_peer in not_good_contacts:
|
|
last_replied = self.peer_manager.get_last_replied(my_peer.address, my_peer.udp_port)
|
|
if not last_replied or last_replied + 60 < self.loop.time():
|
|
not_recently_replied.append(my_peer)
|
|
if not_recently_replied:
|
|
to_replace = not_recently_replied[0]
|
|
else:
|
|
to_replace = self.routing_table.buckets[bucket_index].peers[0]
|
|
last_replied = self.peer_manager.get_last_replied(to_replace.address, to_replace.udp_port)
|
|
if last_replied and last_replied + 60 > self.loop.time():
|
|
return False
|
|
log.debug("pinging %s:%s", to_replace.address, to_replace.udp_port)
|
|
try:
|
|
to_replace_rpc = self.get_rpc_peer(to_replace)
|
|
await to_replace_rpc.ping()
|
|
return False
|
|
except asyncio.TimeoutError:
|
|
log.debug("Replacing dead contact in bucket %i: %s:%i with %s:%i ", bucket_index,
|
|
to_replace.address, to_replace.udp_port, peer.address, peer.udp_port)
|
|
if to_replace in self.routing_table.buckets[bucket_index]:
|
|
self.routing_table.buckets[bucket_index].remove_peer(to_replace)
|
|
return await self._add_peer(peer)
|
|
|
|
def add_peer(self, peer: 'KademliaPeer'):
|
|
if peer.node_id == self.node_id:
|
|
return False
|
|
self._to_add.add(peer)
|
|
self._wakeup_routing_task.set()
|
|
|
|
def remove_peer(self, peer: 'KademliaPeer'):
|
|
self._to_remove.add(peer)
|
|
self._wakeup_routing_task.set()
|
|
|
|
async def routing_table_task(self):
|
|
while True:
|
|
while self._to_remove:
|
|
async with self._split_lock:
|
|
peer = self._to_remove.pop()
|
|
self.routing_table.remove_peer(peer)
|
|
self.routing_table.join_buckets()
|
|
while self._to_add:
|
|
async with self._split_lock:
|
|
await self._add_peer(self._to_add.pop())
|
|
await asyncio.gather(self._wakeup_routing_task.wait(), asyncio.sleep(.1, loop=self.loop), loop=self.loop)
|
|
self._wakeup_routing_task.clear()
|
|
|
|
def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram):
|
|
assert sender_contact.node_id != self.node_id, (sender_contact.node_id.hex()[:8],
|
|
self.node_id.hex()[:8])
|
|
method = message.method
|
|
if method not in [b'ping', b'store', b'findNode', b'findValue']:
|
|
raise AttributeError('Invalid method: %s' % message.method.decode())
|
|
if message.args and isinstance(message.args[-1], dict) and b'protocolVersion' in message.args[-1]:
|
|
# args don't need reformatting
|
|
args, kwargs = tuple(message.args[:-1]), message.args[-1]
|
|
else:
|
|
args, kwargs = self._migrate_incoming_rpc_args(sender_contact, message.method, *message.args)
|
|
log.debug("%s:%i RECV CALL %s %s:%i", self.external_ip, self.udp_port, message.method.decode(),
|
|
sender_contact.address, sender_contact.udp_port)
|
|
|
|
if method == b'ping':
|
|
result = self.node_rpc.ping()
|
|
elif method == b'store':
|
|
blob_hash, token, port, original_publisher_id, age = args[:5] # pylint: disable=unused-variable
|
|
result = self.node_rpc.store(sender_contact, blob_hash, token, port)
|
|
else:
|
|
key = args[0]
|
|
page = kwargs.get(PAGE_KEY, 0)
|
|
if method == b'findNode':
|
|
result = self.node_rpc.find_node(sender_contact, key)
|
|
else:
|
|
assert method == b'findValue'
|
|
result = self.node_rpc.find_value(sender_contact, key, page)
|
|
|
|
self.send_response(
|
|
sender_contact, ResponseDatagram(RESPONSE_TYPE, message.rpc_id, self.node_id, result),
|
|
)
|
|
|
|
def handle_request_datagram(self, address: typing.Tuple[str, int], request_datagram: RequestDatagram):
|
|
# This is an RPC method request
|
|
self.received_request_metric.labels(method=request_datagram.method).inc()
|
|
self.peer_manager.report_last_requested(address[0], address[1])
|
|
try:
|
|
peer = self.routing_table.get_peer(request_datagram.node_id)
|
|
except IndexError:
|
|
try:
|
|
peer = make_kademlia_peer(request_datagram.node_id, address[0], address[1])
|
|
except ValueError as err:
|
|
log.warning("error replying to %s: %s", address[0], str(err))
|
|
return
|
|
try:
|
|
self._handle_rpc(peer, request_datagram)
|
|
# if the contact is not known to be bad (yet) and we haven't yet queried it, send it a ping so that it
|
|
# will be added to our routing table if successful
|
|
is_good = self.peer_manager.peer_is_good(peer)
|
|
if is_good is None:
|
|
self.ping_queue.enqueue_maybe_ping(peer)
|
|
# only add a requesting contact to the routing table if it has replied to one of our requests
|
|
elif is_good is True:
|
|
self.add_peer(peer)
|
|
except ValueError as err:
|
|
log.debug("error raised handling %s request from %s:%i - %s(%s)",
|
|
request_datagram.method, peer.address, peer.udp_port, str(type(err)),
|
|
str(err))
|
|
self.send_error(
|
|
peer,
|
|
ErrorDatagram(ERROR_TYPE, request_datagram.rpc_id, self.node_id, str(type(err)).encode(),
|
|
str(err).encode())
|
|
)
|
|
except Exception as err:
|
|
log.warning("error raised handling %s request from %s:%i - %s(%s)",
|
|
request_datagram.method, peer.address, peer.udp_port, str(type(err)),
|
|
str(err))
|
|
self.send_error(
|
|
peer,
|
|
ErrorDatagram(ERROR_TYPE, request_datagram.rpc_id, self.node_id, str(type(err)).encode(),
|
|
str(err).encode())
|
|
)
|
|
|
|
def handle_response_datagram(self, address: typing.Tuple[str, int], response_datagram: ResponseDatagram):
|
|
# Find the message that triggered this response
|
|
if response_datagram.rpc_id in self.sent_messages:
|
|
peer, future, _ = self.sent_messages[response_datagram.rpc_id]
|
|
if peer.address != address[0]:
|
|
future.set_exception(
|
|
RemoteException(f"response from {address[0]}, expected {peer.address}")
|
|
)
|
|
return
|
|
|
|
# We got a result from the RPC
|
|
if peer.node_id == self.node_id:
|
|
future.set_exception(RemoteException("node has our node id"))
|
|
return
|
|
elif response_datagram.node_id == self.node_id:
|
|
future.set_exception(RemoteException("incoming message is from our node id"))
|
|
return
|
|
peer = make_kademlia_peer(response_datagram.node_id, address[0], address[1])
|
|
self.peer_manager.report_last_replied(address[0], address[1])
|
|
self.peer_manager.update_contact_triple(peer.node_id, address[0], address[1])
|
|
if not future.cancelled():
|
|
future.set_result(response_datagram)
|
|
self.add_peer(peer)
|
|
else:
|
|
log.warning("%s:%i replied, but after we cancelled the request attempt",
|
|
peer.address, peer.udp_port)
|
|
else:
|
|
# If the original message isn't found, it must have timed out
|
|
# TODO: we should probably do something with this...
|
|
pass
|
|
|
|
def handle_error_datagram(self, address, error_datagram: ErrorDatagram):
|
|
# The RPC request raised a remote exception; raise it locally
|
|
remote_exception = RemoteException(f"{error_datagram.exception_type}({error_datagram.response})")
|
|
if error_datagram.rpc_id in self.sent_messages:
|
|
peer, future, request = self.sent_messages.pop(error_datagram.rpc_id)
|
|
if (peer.address, peer.udp_port) != address:
|
|
future.set_exception(
|
|
RemoteException(
|
|
f"response from {address[0]}:{address[1]}, "
|
|
f"expected {peer.address}:{peer.udp_port}"
|
|
)
|
|
)
|
|
return
|
|
error_msg = f"" \
|
|
f"Error sending '{request.method}' to {peer.address}:{peer.udp_port}\n" \
|
|
f"Args: {request.args}\n" \
|
|
f"Raised: {str(remote_exception)}"
|
|
if 'Invalid token' in error_msg:
|
|
log.debug(error_msg)
|
|
elif error_datagram.response not in OLD_PROTOCOL_ERRORS:
|
|
log.warning(error_msg)
|
|
else:
|
|
log.debug(
|
|
"known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)",
|
|
peer.address, peer.udp_port, OLD_PROTOCOL_ERRORS[error_datagram.response]
|
|
)
|
|
future.set_exception(remote_exception)
|
|
return
|
|
else:
|
|
if error_datagram.response not in OLD_PROTOCOL_ERRORS:
|
|
msg = f"Received error from {address[0]}:{address[1]}, but it isn't in response to a " \
|
|
f"pending request: {str(remote_exception)}"
|
|
log.warning(msg)
|
|
else:
|
|
log.debug(
|
|
"known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)",
|
|
address[0], address[1], OLD_PROTOCOL_ERRORS[error_datagram.response]
|
|
)
|
|
|
|
def datagram_received(self, datagram: bytes, address: typing.Tuple[str, int]) -> None: # pylint: disable=arguments-renamed
|
|
try:
|
|
message = decode_datagram(datagram)
|
|
except (ValueError, TypeError, DecodeError):
|
|
self.peer_manager.report_failure(address[0], address[1])
|
|
log.warning("Couldn't decode dht datagram from %s: %s", address, datagram.hex())
|
|
return
|
|
|
|
if isinstance(message, RequestDatagram):
|
|
self.handle_request_datagram(address, message)
|
|
elif isinstance(message, ErrorDatagram):
|
|
self.handle_error_datagram(address, message)
|
|
else:
|
|
assert isinstance(message, ResponseDatagram), "sanity"
|
|
self.handle_response_datagram(address, message)
|
|
|
|
async def send_request(self, peer: 'KademliaPeer', request: RequestDatagram) -> ResponseDatagram:
|
|
self._send(peer, request)
|
|
response_fut = self.sent_messages[request.rpc_id][1]
|
|
try:
|
|
self.request_sent_metric.labels(method=request.method).inc()
|
|
start = time.perf_counter()
|
|
response = await asyncio.wait_for(response_fut, self.rpc_timeout)
|
|
self.response_time_metric.labels(method=request.method).observe(time.perf_counter() - start)
|
|
self.peer_manager.report_last_replied(peer.address, peer.udp_port)
|
|
self.request_success_metric.labels(method=request.method).inc()
|
|
return response
|
|
except asyncio.CancelledError:
|
|
if not response_fut.done():
|
|
response_fut.cancel()
|
|
raise
|
|
except (asyncio.TimeoutError, RemoteException):
|
|
self.peer_manager.report_failure(peer.address, peer.udp_port)
|
|
if self.peer_manager.peer_is_good(peer) is False:
|
|
self.remove_peer(peer)
|
|
raise
|
|
|
|
def send_response(self, peer: 'KademliaPeer', response: ResponseDatagram):
|
|
self._send(peer, response)
|
|
|
|
def send_error(self, peer: 'KademliaPeer', error: ErrorDatagram):
|
|
self._send(peer, error)
|
|
|
|
def _send(self, peer: 'KademliaPeer', message: typing.Union[RequestDatagram, ResponseDatagram, ErrorDatagram]):
|
|
if not self.transport or self.transport.is_closing():
|
|
raise TransportNotConnected()
|
|
|
|
data = message.bencode()
|
|
if len(data) > constants.MSG_SIZE_LIMIT:
|
|
log.warning("cannot send datagram larger than %i bytes (packet is %i bytes)",
|
|
constants.MSG_SIZE_LIMIT, len(data))
|
|
log.debug("Packet is too large to send: %s", data[:3500].hex())
|
|
raise ValueError(
|
|
f"cannot send datagram larger than {constants.MSG_SIZE_LIMIT} bytes (packet is {len(data)} bytes)"
|
|
)
|
|
if isinstance(message, (RequestDatagram, ResponseDatagram)):
|
|
assert message.node_id == self.node_id, message
|
|
if isinstance(message, RequestDatagram):
|
|
assert self.node_id != peer.node_id
|
|
|
|
def pop_from_sent_messages(_):
|
|
if message.rpc_id in self.sent_messages:
|
|
self.sent_messages.pop(message.rpc_id)
|
|
|
|
if isinstance(message, RequestDatagram):
|
|
response_fut = self.loop.create_future()
|
|
response_fut.add_done_callback(pop_from_sent_messages)
|
|
self.sent_messages[message.rpc_id] = (peer, response_fut, message)
|
|
try:
|
|
self.transport.sendto(data, (peer.address, peer.udp_port))
|
|
except OSError as err:
|
|
# TODO: handle ENETUNREACH
|
|
if err.errno == socket.EWOULDBLOCK:
|
|
# i'm scared this may swallow important errors, but i get a million of these
|
|
# on Linux and it doesn't seem to affect anything -grin
|
|
log.warning("Can't send data to dht: EWOULDBLOCK")
|
|
else:
|
|
log.error("DHT socket error sending %i bytes to %s:%i - %s (code %i)",
|
|
len(data), peer.address, peer.udp_port, str(err), err.errno)
|
|
if isinstance(message, RequestDatagram):
|
|
self.sent_messages[message.rpc_id][1].set_exception(err)
|
|
else:
|
|
raise err
|
|
if isinstance(message, RequestDatagram):
|
|
self.peer_manager.report_last_sent(peer.address, peer.udp_port)
|
|
elif isinstance(message, ErrorDatagram):
|
|
self.peer_manager.report_failure(peer.address, peer.udp_port)
|
|
|
|
def change_token(self):
|
|
self.old_token_secret = self.token_secret
|
|
self.token_secret = constants.generate_id()
|
|
|
|
def make_token(self, compact_ip):
|
|
return constants.digest(self.token_secret + compact_ip)
|
|
|
|
def verify_token(self, token, compact_ip):
|
|
h = constants.HASH_CLASS()
|
|
h.update(self.token_secret + compact_ip)
|
|
if self.old_token_secret and not token == h.digest(): # TODO: why should we be accepting the previous token?
|
|
h = constants.HASH_CLASS()
|
|
h.update(self.old_token_secret + compact_ip)
|
|
if not token == h.digest():
|
|
return False
|
|
return True
|
|
|
|
async def store_to_peer(self, hash_value: bytes, peer: 'KademliaPeer', # pylint: disable=too-many-return-statements
|
|
retry: bool = True) -> typing.Tuple[bytes, bool]:
|
|
async def __store():
|
|
res = await self.get_rpc_peer(peer).store(hash_value)
|
|
if res != b"OK":
|
|
raise ValueError(res)
|
|
log.debug("Stored %s to %s", hash_value.hex()[:8], peer)
|
|
return peer.node_id, True
|
|
|
|
try:
|
|
return await __store()
|
|
except asyncio.TimeoutError:
|
|
log.debug("Timeout while storing blob_hash %s at %s", hash_value.hex()[:8], peer)
|
|
return peer.node_id, False
|
|
except ValueError as err:
|
|
log.error("Unexpected response: %s", err)
|
|
return peer.node_id, False
|
|
except RemoteException as err:
|
|
if 'findValue() takes exactly 2 arguments (5 given)' in str(err):
|
|
log.debug("peer %s:%i is running an incompatible version of lbrynet", peer.address, peer.udp_port)
|
|
return peer.node_id, False
|
|
if 'Invalid token' not in str(err):
|
|
log.warning("Unexpected error while storing blob_hash: %s", err)
|
|
return peer.node_id, False
|
|
self.peer_manager.clear_token(peer.node_id)
|
|
if not retry:
|
|
return peer.node_id, False
|
|
return await self.store_to_peer(hash_value, peer, retry=False)
|