forked from LBRYCommunity/lbry-sdk
Merge pull request #1948 from lbryio/duplicates-in-routing
Test and fix edge cases for adding peers to routing
This commit is contained in:
commit
7ad29b4bfb
5 changed files with 139 additions and 49 deletions
|
@ -16,6 +16,7 @@ refresh_interval = 3600 # 1 hour
|
||||||
replicate_interval = refresh_interval
|
replicate_interval = refresh_interval
|
||||||
data_expiration = 86400 # 24 hours
|
data_expiration = 86400 # 24 hours
|
||||||
token_secret_refresh_interval = 300 # 5 minutes
|
token_secret_refresh_interval = 300 # 5 minutes
|
||||||
|
maybe_ping_delay = 300 # 5 minutes
|
||||||
check_refresh_interval = refresh_interval / 5
|
check_refresh_interval = refresh_interval / 5
|
||||||
max_datagram_size = 8192 # 8 KB
|
max_datagram_size = 8192 # 8 KB
|
||||||
rpc_id_length = 20
|
rpc_id_length = 20
|
||||||
|
|
|
@ -187,56 +187,46 @@ class PingQueue:
|
||||||
def __init__(self, loop: asyncio.BaseEventLoop, protocol: 'KademliaProtocol'):
|
def __init__(self, loop: asyncio.BaseEventLoop, protocol: 'KademliaProtocol'):
|
||||||
self._loop = loop
|
self._loop = loop
|
||||||
self._protocol = protocol
|
self._protocol = protocol
|
||||||
self._enqueued_contacts: typing.List['KademliaPeer'] = []
|
|
||||||
self._pending_contacts: typing.Dict['KademliaPeer', float] = {}
|
self._pending_contacts: typing.Dict['KademliaPeer', float] = {}
|
||||||
self._process_task: asyncio.Task = None
|
self._process_task: asyncio.Task = None
|
||||||
self._next_task: asyncio.Future = None
|
|
||||||
self._next_timer: asyncio.TimerHandle = None
|
|
||||||
self._running = False
|
self._running = False
|
||||||
|
self._running_pings: typing.List[asyncio.Task] = []
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def running(self):
|
def running(self):
|
||||||
return self._running
|
return self._running
|
||||||
|
|
||||||
def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: typing.Optional[float] = None):
|
def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: float = constants.maybe_ping_delay):
|
||||||
delay = constants.check_refresh_interval if delay is None else delay
|
now = self._loop.time()
|
||||||
for peer in peers:
|
for peer in peers:
|
||||||
if delay and peer not in self._enqueued_contacts:
|
if peer not in self._pending_contacts or now + delay < self._pending_contacts[peer]:
|
||||||
self._pending_contacts[peer] = self._loop.time() + delay
|
self._pending_contacts[peer] = delay + now
|
||||||
elif peer not in self._enqueued_contacts:
|
|
||||||
self._enqueued_contacts.append(peer)
|
|
||||||
if peer in self._pending_contacts:
|
|
||||||
del self._pending_contacts[peer]
|
|
||||||
|
|
||||||
async def _process(self):
|
def maybe_ping(self, peer: 'KademliaPeer'):
|
||||||
async def _ping(p: 'KademliaPeer'):
|
async def ping_task():
|
||||||
try:
|
try:
|
||||||
if self._protocol.peer_manager.peer_is_good(p):
|
if self._protocol.peer_manager.peer_is_good(peer):
|
||||||
await self._protocol.add_peer(p)
|
if peer not in self._protocol.routing_table.get_peers():
|
||||||
|
await self._protocol.add_peer(peer)
|
||||||
return
|
return
|
||||||
await self._protocol.get_rpc_peer(p).ping()
|
await self._protocol.get_rpc_peer(peer).ping()
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
task = self._loop.create_task(ping_task())
|
||||||
|
task.add_done_callback(lambda _: None if task not in self._running_pings else self._running_pings.remove(task))
|
||||||
|
self._running_pings.append(task)
|
||||||
|
|
||||||
|
async def _process(self): # send up to 1 ping per second
|
||||||
while True:
|
while True:
|
||||||
tasks = []
|
enqueued = list(self._pending_contacts.keys())
|
||||||
|
now = self._loop.time()
|
||||||
if self._enqueued_contacts or self._pending_contacts:
|
for peer in enqueued:
|
||||||
now = self._loop.time()
|
if self._pending_contacts[peer] <= now:
|
||||||
scheduled = [k for k, d in self._pending_contacts.items() if now >= d]
|
del self._pending_contacts[peer]
|
||||||
for k in scheduled:
|
self.maybe_ping(peer)
|
||||||
del self._pending_contacts[k]
|
break
|
||||||
if k not in self._enqueued_contacts:
|
await asyncio.sleep(1, loop=self._loop)
|
||||||
self._enqueued_contacts.append(k)
|
|
||||||
while self._enqueued_contacts:
|
|
||||||
peer = self._enqueued_contacts.pop()
|
|
||||||
tasks.append(self._loop.create_task(_ping(peer)))
|
|
||||||
if tasks:
|
|
||||||
await asyncio.wait(tasks, loop=self._loop)
|
|
||||||
|
|
||||||
f = self._loop.create_future()
|
|
||||||
self._loop.call_later(1.0, lambda: None if f.done() else f.set_result(None))
|
|
||||||
await f
|
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
assert not self._running
|
assert not self._running
|
||||||
|
@ -250,12 +240,8 @@ class PingQueue:
|
||||||
if self._process_task:
|
if self._process_task:
|
||||||
self._process_task.cancel()
|
self._process_task.cancel()
|
||||||
self._process_task = None
|
self._process_task = None
|
||||||
if self._next_task:
|
while self._running_pings:
|
||||||
self._next_task.cancel()
|
self._running_pings[0].cancel()
|
||||||
self._next_task = None
|
|
||||||
if self._next_timer:
|
|
||||||
self._next_timer.cancel()
|
|
||||||
self._next_timer = None
|
|
||||||
|
|
||||||
|
|
||||||
class KademliaProtocol(DatagramProtocol):
|
class KademliaProtocol(DatagramProtocol):
|
||||||
|
@ -313,9 +299,13 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
return args, {}
|
return args, {}
|
||||||
|
|
||||||
async def _add_peer(self, peer: 'KademliaPeer'):
|
async def _add_peer(self, peer: 'KademliaPeer'):
|
||||||
|
for p in self.routing_table.get_peers():
|
||||||
|
if (p.address, p.udp_port) == (peer.address, peer.udp_port) and p.node_id != peer.node_id:
|
||||||
|
self.routing_table.remove_peer(p)
|
||||||
bucket_index = self.routing_table.kbucket_index(peer.node_id)
|
bucket_index = self.routing_table.kbucket_index(peer.node_id)
|
||||||
if self.routing_table.buckets[bucket_index].add_peer(peer):
|
if self.routing_table.buckets[bucket_index].add_peer(peer):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# The bucket is full; see if it can be split (by checking if its range includes the host node's node_id)
|
# The bucket is full; see if it can be split (by checking if its range includes the host node's node_id)
|
||||||
if self.routing_table.should_split(bucket_index, peer.node_id):
|
if self.routing_table.should_split(bucket_index, peer.node_id):
|
||||||
self.routing_table.split_bucket(bucket_index)
|
self.routing_table.split_bucket(bucket_index)
|
||||||
|
@ -409,12 +399,13 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
sender_contact, ResponseDatagram(RESPONSE_TYPE, message.rpc_id, self.node_id, result),
|
sender_contact, ResponseDatagram(RESPONSE_TYPE, message.rpc_id, self.node_id, result),
|
||||||
)
|
)
|
||||||
|
|
||||||
async def handle_request_datagram(self, address, request_datagram: RequestDatagram):
|
async def handle_request_datagram(self, address: typing.Tuple[str, int], request_datagram: RequestDatagram):
|
||||||
# This is an RPC method request
|
# This is an RPC method request
|
||||||
self.peer_manager.report_last_requested(address[0], address[1])
|
self.peer_manager.report_last_requested(address[0], address[1])
|
||||||
self.peer_manager.update_contact_triple(request_datagram.node_id, address[0], address[1])
|
try:
|
||||||
# only add a requesting contact to the routing table if it has replied to one of our requests
|
peer = self.routing_table.get_peer(request_datagram.node_id)
|
||||||
peer = self.peer_manager.get_kademlia_peer(request_datagram.node_id, address[0], address[1])
|
except IndexError:
|
||||||
|
peer = self.peer_manager.get_kademlia_peer(request_datagram.node_id, address[0], address[1])
|
||||||
try:
|
try:
|
||||||
await self._handle_rpc(peer, request_datagram)
|
await self._handle_rpc(peer, request_datagram)
|
||||||
# if the contact is not known to be bad (yet) and we haven't yet queried it, send it a ping so that it
|
# if the contact is not known to be bad (yet) and we haven't yet queried it, send it a ping so that it
|
||||||
|
@ -422,6 +413,7 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
is_good = self.peer_manager.peer_is_good(peer)
|
is_good = self.peer_manager.peer_is_good(peer)
|
||||||
if is_good is None:
|
if is_good is None:
|
||||||
self.ping_queue.enqueue_maybe_ping(peer)
|
self.ping_queue.enqueue_maybe_ping(peer)
|
||||||
|
# only add a requesting contact to the routing table if it has replied to one of our requests
|
||||||
elif is_good is True:
|
elif is_good is True:
|
||||||
await self.add_peer(peer)
|
await self.add_peer(peer)
|
||||||
except ValueError as err:
|
except ValueError as err:
|
||||||
|
@ -449,11 +441,10 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
peer, df, request = self.sent_messages[response_datagram.rpc_id]
|
peer, df, request = self.sent_messages[response_datagram.rpc_id]
|
||||||
if peer.address != address[0]:
|
if peer.address != address[0]:
|
||||||
df.set_exception(RemoteException(
|
df.set_exception(RemoteException(
|
||||||
f"response from {address[0]}:{address[1]}, "
|
f"response from {address[0]}, expected {peer.address}")
|
||||||
f"expected {peer.address}:{peer.udp_port}")
|
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
peer.set_id(response_datagram.node_id)
|
|
||||||
# We got a result from the RPC
|
# We got a result from the RPC
|
||||||
if peer.node_id == self.node_id:
|
if peer.node_id == self.node_id:
|
||||||
df.set_exception(RemoteException("node has our node id"))
|
df.set_exception(RemoteException("node has our node id"))
|
||||||
|
@ -461,6 +452,8 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
elif response_datagram.node_id == self.node_id:
|
elif response_datagram.node_id == self.node_id:
|
||||||
df.set_exception(RemoteException("incoming message is from our node id"))
|
df.set_exception(RemoteException("incoming message is from our node id"))
|
||||||
return
|
return
|
||||||
|
peer.set_id(response_datagram.node_id)
|
||||||
|
peer.update_udp_port(address[1])
|
||||||
self.peer_manager.report_last_replied(address[0], address[1])
|
self.peer_manager.report_last_replied(address[0], address[1])
|
||||||
self.peer_manager.update_contact_triple(peer.node_id, address[0], address[1])
|
self.peer_manager.update_contact_triple(peer.node_id, address[0], address[1])
|
||||||
if not df.cancelled():
|
if not df.cancelled():
|
||||||
|
|
|
@ -49,7 +49,14 @@ class KBucket:
|
||||||
self.peers.remove(peer)
|
self.peers.remove(peer)
|
||||||
self.peers.append(peer)
|
self.peers.append(peer)
|
||||||
return True
|
return True
|
||||||
elif len(self.peers) < constants.k:
|
else:
|
||||||
|
for i in range(len(self.peers)):
|
||||||
|
p = self.peers[i]
|
||||||
|
if p.node_id == peer.node_id:
|
||||||
|
self.peers.remove(p)
|
||||||
|
self.peers.append(peer)
|
||||||
|
return True
|
||||||
|
if len(self.peers) < constants.k:
|
||||||
self.peers.append(peer)
|
self.peers.append(peer)
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -93,3 +93,65 @@ class TestProtocol(AsyncioTestCase):
|
||||||
peer2.stop()
|
peer2.stop()
|
||||||
peer1.disconnect()
|
peer1.disconnect()
|
||||||
peer2.disconnect()
|
peer2.disconnect()
|
||||||
|
|
||||||
|
async def _make_protocol(self, other_peer, node_id, address, udp_port, tcp_port):
|
||||||
|
proto = KademliaProtocol(
|
||||||
|
self.loop, PeerManager(self.loop), node_id, address, udp_port, tcp_port
|
||||||
|
)
|
||||||
|
await self.loop.create_datagram_endpoint(lambda: proto, (address, 4444))
|
||||||
|
return proto, other_peer.peer_manager.get_kademlia_peer(node_id, address, udp_port=udp_port)
|
||||||
|
|
||||||
|
async def test_add_peer_after_handle_request(self):
|
||||||
|
with dht_mocks.mock_network_loop(self.loop):
|
||||||
|
node_id1 = constants.generate_id()
|
||||||
|
node_id2 = constants.generate_id()
|
||||||
|
node_id3 = constants.generate_id()
|
||||||
|
node_id4 = constants.generate_id()
|
||||||
|
|
||||||
|
peer1 = KademliaProtocol(
|
||||||
|
self.loop, PeerManager(self.loop), node_id1, '1.2.3.4', 4444, 3333
|
||||||
|
)
|
||||||
|
await self.loop.create_datagram_endpoint(lambda: peer1, ('1.2.3.4', 4444))
|
||||||
|
|
||||||
|
peer2, peer_2_from_peer_1 = await self._make_protocol(peer1, node_id2, '1.2.3.5', 4444, 3333)
|
||||||
|
peer3, peer_3_from_peer_1 = await self._make_protocol(peer1, node_id3, '1.2.3.6', 4444, 3333)
|
||||||
|
peer4, peer_4_from_peer_1 = await self._make_protocol(peer1, node_id4, '1.2.3.7', 4444, 3333)
|
||||||
|
|
||||||
|
# peers who reply should be added
|
||||||
|
await peer1.get_rpc_peer(peer_2_from_peer_1).ping()
|
||||||
|
self.assertListEqual([peer_2_from_peer_1], peer1.routing_table.get_peers())
|
||||||
|
peer1.routing_table.remove_peer(peer_2_from_peer_1)
|
||||||
|
|
||||||
|
# peers not known by be good/bad should be enqueued to maybe-ping
|
||||||
|
peer1_from_peer3 = peer3.get_rpc_peer(peer3.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', 4444))
|
||||||
|
self.assertEqual(0, len(peer1.ping_queue._pending_contacts))
|
||||||
|
pong = await peer1_from_peer3.ping()
|
||||||
|
self.assertEqual(b'pong', pong)
|
||||||
|
self.assertEqual(1, len(peer1.ping_queue._pending_contacts))
|
||||||
|
peer1.ping_queue._pending_contacts.clear()
|
||||||
|
|
||||||
|
# peers who are already good should be added
|
||||||
|
peer1_from_peer4 = peer4.get_rpc_peer(peer4.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', 4444))
|
||||||
|
peer1.peer_manager.update_contact_triple(node_id4,'1.2.3.7', 4444)
|
||||||
|
peer1.peer_manager.report_last_replied('1.2.3.7', 4444)
|
||||||
|
self.assertEqual(0, len(peer1.ping_queue._pending_contacts))
|
||||||
|
pong = await peer1_from_peer4.ping()
|
||||||
|
self.assertEqual(b'pong', pong)
|
||||||
|
self.assertEqual(1, len(peer1.routing_table.get_peers()))
|
||||||
|
self.assertEqual(0, len(peer1.ping_queue._pending_contacts))
|
||||||
|
peer1.routing_table.buckets[0].peers.clear()
|
||||||
|
|
||||||
|
# peers who are known to be bad recently should not be added or maybe-pinged
|
||||||
|
peer1_from_peer4 = peer4.get_rpc_peer(peer4.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', 4444))
|
||||||
|
peer1.peer_manager.update_contact_triple(node_id4,'1.2.3.7', 4444)
|
||||||
|
peer1.peer_manager.report_failure('1.2.3.7', 4444)
|
||||||
|
peer1.peer_manager.report_failure('1.2.3.7', 4444)
|
||||||
|
self.assertEqual(0, len(peer1.ping_queue._pending_contacts))
|
||||||
|
pong = await peer1_from_peer4.ping()
|
||||||
|
self.assertEqual(b'pong', pong)
|
||||||
|
self.assertEqual(0, len(peer1.routing_table.get_peers()))
|
||||||
|
self.assertEqual(0, len(peer1.ping_queue._pending_contacts))
|
||||||
|
|
||||||
|
for p in [peer1, peer2, peer3, peer4]:
|
||||||
|
p.stop()
|
||||||
|
p.disconnect()
|
||||||
|
|
|
@ -2,7 +2,7 @@ import struct
|
||||||
import asyncio
|
import asyncio
|
||||||
from lbrynet.utils import generate_id
|
from lbrynet.utils import generate_id
|
||||||
from lbrynet.dht.protocol.routing_table import KBucket
|
from lbrynet.dht.protocol.routing_table import KBucket
|
||||||
from lbrynet.dht.peer import PeerManager
|
from lbrynet.dht.peer import PeerManager, KademliaPeer
|
||||||
from lbrynet.dht import constants
|
from lbrynet.dht import constants
|
||||||
from torba.testcase import AsyncioTestCase
|
from torba.testcase import AsyncioTestCase
|
||||||
|
|
||||||
|
@ -29,6 +29,33 @@ class TestKBucket(AsyncioTestCase):
|
||||||
self.kbucket = KBucket(self.peer_manager, 0, 2**constants.hash_bits, generate_id())
|
self.kbucket = KBucket(self.peer_manager, 0, 2**constants.hash_bits, generate_id())
|
||||||
|
|
||||||
def test_add_peer(self):
|
def test_add_peer(self):
|
||||||
|
peer = KademliaPeer(None, '1.2.3.4', constants.generate_id(2), udp_port=4444)
|
||||||
|
peer_update2 = KademliaPeer(None, '1.2.3.4', constants.generate_id(2), udp_port=4445)
|
||||||
|
|
||||||
|
self.assertListEqual([], self.kbucket.peers)
|
||||||
|
|
||||||
|
# add the peer
|
||||||
|
self.kbucket.add_peer(peer)
|
||||||
|
self.assertListEqual([peer], self.kbucket.peers)
|
||||||
|
|
||||||
|
# re-add it
|
||||||
|
self.kbucket.add_peer(peer)
|
||||||
|
self.assertListEqual([peer], self.kbucket.peers)
|
||||||
|
self.assertEqual(self.kbucket.peers[0].udp_port, 4444)
|
||||||
|
|
||||||
|
# add a new peer object with the same id and address but a different port
|
||||||
|
self.kbucket.add_peer(peer_update2)
|
||||||
|
self.assertListEqual([peer_update2], self.kbucket.peers)
|
||||||
|
self.assertEqual(self.kbucket.peers[0].udp_port, 4445)
|
||||||
|
|
||||||
|
# modify the peer object to have a different port
|
||||||
|
peer_update2.udp_port = 4444
|
||||||
|
self.kbucket.add_peer(peer_update2)
|
||||||
|
self.assertListEqual([peer_update2], self.kbucket.peers)
|
||||||
|
self.assertEqual(self.kbucket.peers[0].udp_port, 4444)
|
||||||
|
|
||||||
|
self.kbucket.peers.clear()
|
||||||
|
|
||||||
# Test if contacts can be added to empty list
|
# Test if contacts can be added to empty list
|
||||||
# Add k contacts to bucket
|
# Add k contacts to bucket
|
||||||
for i in range(constants.k):
|
for i in range(constants.k):
|
||||||
|
|
Loading…
Add table
Reference in a new issue