forked from LBRYCommunity/lbry-sdk
fix dht deadlock
This commit is contained in:
parent
309cfc840c
commit
0da2827c78
5 changed files with 42 additions and 13 deletions
|
@ -101,6 +101,7 @@ class Node:
|
||||||
self._refresh_task.cancel()
|
self._refresh_task.cancel()
|
||||||
if self.protocol and self.protocol.ping_queue.running:
|
if self.protocol and self.protocol.ping_queue.running:
|
||||||
self.protocol.ping_queue.stop()
|
self.protocol.ping_queue.stop()
|
||||||
|
self.protocol.stop()
|
||||||
if self.listening_port is not None:
|
if self.listening_port is not None:
|
||||||
self.listening_port.close()
|
self.listening_port.close()
|
||||||
self._join_task = None
|
self._join_task = None
|
||||||
|
@ -113,6 +114,7 @@ class Node:
|
||||||
lambda: self.protocol, (interface, self.internal_udp_port)
|
lambda: self.protocol, (interface, self.internal_udp_port)
|
||||||
)
|
)
|
||||||
log.info("DHT node listening on UDP %s:%i", interface, self.internal_udp_port)
|
log.info("DHT node listening on UDP %s:%i", interface, self.internal_udp_port)
|
||||||
|
self.protocol.start()
|
||||||
else:
|
else:
|
||||||
log.warning("Already bound to port %s", self.listening_port)
|
log.warning("Already bound to port %s", self.listening_port)
|
||||||
|
|
||||||
|
@ -130,7 +132,8 @@ class Node:
|
||||||
if known_node_urls:
|
if known_node_urls:
|
||||||
for host, port in known_node_urls:
|
for host, port in known_node_urls:
|
||||||
address = await resolve_host(host, port, proto='udp')
|
address = await resolve_host(host, port, proto='udp')
|
||||||
if (address, port) not in known_node_addresses and address != self.protocol.external_ip:
|
if (address, port) not in known_node_addresses and\
|
||||||
|
(address, port) != (self.protocol.external_ip, self.protocol.udp_port):
|
||||||
known_node_addresses.append((address, port))
|
known_node_addresses.append((address, port))
|
||||||
url_to_addr[address] = host
|
url_to_addr[address] = host
|
||||||
|
|
||||||
|
|
|
@ -207,7 +207,7 @@ class PingQueue:
|
||||||
try:
|
try:
|
||||||
if self._protocol.peer_manager.peer_is_good(peer):
|
if self._protocol.peer_manager.peer_is_good(peer):
|
||||||
if peer not in self._protocol.routing_table.get_peers():
|
if peer not in self._protocol.routing_table.get_peers():
|
||||||
await self._protocol.add_peer(peer)
|
self._protocol.add_peer(peer)
|
||||||
return
|
return
|
||||||
await self._protocol.get_rpc_peer(peer).ping()
|
await self._protocol.get_rpc_peer(peer).ping()
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
|
@ -268,11 +268,19 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
self.node_rpc = KademliaRPC(self, self.loop, self.peer_port)
|
self.node_rpc = KademliaRPC(self, self.loop, self.peer_port)
|
||||||
self.rpc_timeout = rpc_timeout
|
self.rpc_timeout = rpc_timeout
|
||||||
self._split_lock = asyncio.Lock(loop=self.loop)
|
self._split_lock = asyncio.Lock(loop=self.loop)
|
||||||
|
self._to_remove: typing.Set['KademliaPeer'] = set()
|
||||||
|
self._to_add: typing.Set['KademliaPeer'] = set()
|
||||||
|
self.maintaing_routing_task: typing.Optional[asyncio.Task] = None
|
||||||
|
|
||||||
def get_rpc_peer(self, peer: 'KademliaPeer') -> RemoteKademliaRPC:
|
def get_rpc_peer(self, peer: 'KademliaPeer') -> RemoteKademliaRPC:
|
||||||
return RemoteKademliaRPC(self.loop, self.peer_manager, self, peer)
|
return RemoteKademliaRPC(self.loop, self.peer_manager, self, peer)
|
||||||
|
|
||||||
|
def start(self, force_delay=None):
|
||||||
|
self.maintaing_routing_task = asyncio.create_task(self.routing_table_task(force_delay))
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
if self.maintaing_routing_task:
|
||||||
|
self.maintaing_routing_task.cancel()
|
||||||
if self.transport:
|
if self.transport:
|
||||||
self.disconnect()
|
self.disconnect()
|
||||||
|
|
||||||
|
@ -299,6 +307,7 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
return args, {}
|
return args, {}
|
||||||
|
|
||||||
async def _add_peer(self, peer: 'KademliaPeer'):
|
async def _add_peer(self, peer: 'KademliaPeer'):
|
||||||
|
log.debug("Trying to add %s:%d", peer.address, peer.udp_port)
|
||||||
for p in self.routing_table.get_peers():
|
for p in self.routing_table.get_peers():
|
||||||
if (p.address, p.udp_port) == (peer.address, peer.udp_port) and p.node_id != peer.node_id:
|
if (p.address, p.udp_port) == (peer.address, peer.udp_port) and p.node_id != peer.node_id:
|
||||||
self.routing_table.remove_peer(p)
|
self.routing_table.remove_peer(p)
|
||||||
|
@ -363,11 +372,23 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
self.routing_table.buckets[bucket_index].remove_peer(to_replace)
|
self.routing_table.buckets[bucket_index].remove_peer(to_replace)
|
||||||
return await self._add_peer(peer)
|
return await self._add_peer(peer)
|
||||||
|
|
||||||
async def add_peer(self, peer: 'KademliaPeer') -> bool:
|
def add_peer(self, peer: 'KademliaPeer') -> bool:
|
||||||
if peer.node_id == self.node_id:
|
if peer.node_id == self.node_id:
|
||||||
return False
|
return False
|
||||||
|
self._to_add.add(peer)
|
||||||
|
|
||||||
|
async def routing_table_task(self, force_delay=None):
|
||||||
|
while True:
|
||||||
|
while self._to_remove:
|
||||||
async with self._split_lock:
|
async with self._split_lock:
|
||||||
return await self._add_peer(peer)
|
peer = self._to_remove.pop()
|
||||||
|
log.debug("Trying to remove %s:%d", peer.address, peer.udp_port)
|
||||||
|
self.routing_table.remove_peer(peer)
|
||||||
|
self.routing_table.join_buckets()
|
||||||
|
while self._to_add:
|
||||||
|
async with self._split_lock:
|
||||||
|
await self._add_peer(self._to_add.pop())
|
||||||
|
await asyncio.sleep(force_delay or constants.rpc_timeout)
|
||||||
|
|
||||||
async def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram):
|
async def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram):
|
||||||
assert sender_contact.node_id != self.node_id, (binascii.hexlify(sender_contact.node_id)[:8].decode(),
|
assert sender_contact.node_id != self.node_id, (binascii.hexlify(sender_contact.node_id)[:8].decode(),
|
||||||
|
@ -416,7 +437,7 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
self.ping_queue.enqueue_maybe_ping(peer)
|
self.ping_queue.enqueue_maybe_ping(peer)
|
||||||
# only add a requesting contact to the routing table if it has replied to one of our requests
|
# only add a requesting contact to the routing table if it has replied to one of our requests
|
||||||
elif is_good is True:
|
elif is_good is True:
|
||||||
await self.add_peer(peer)
|
self.add_peer(peer)
|
||||||
except ValueError as err:
|
except ValueError as err:
|
||||||
log.debug("error raised handling %s request from %s:%i - %s(%s)",
|
log.debug("error raised handling %s request from %s:%i - %s(%s)",
|
||||||
request_datagram.method, peer.address, peer.udp_port, str(type(err)),
|
request_datagram.method, peer.address, peer.udp_port, str(type(err)),
|
||||||
|
@ -459,7 +480,7 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
self.peer_manager.update_contact_triple(peer.node_id, address[0], address[1])
|
self.peer_manager.update_contact_triple(peer.node_id, address[0], address[1])
|
||||||
if not df.cancelled():
|
if not df.cancelled():
|
||||||
df.set_result(response_datagram)
|
df.set_result(response_datagram)
|
||||||
await self.add_peer(peer)
|
self.add_peer(peer)
|
||||||
else:
|
else:
|
||||||
log.warning("%s:%i replied, but after we cancelled the request attempt",
|
log.warning("%s:%i replied, but after we cancelled the request attempt",
|
||||||
peer.address, peer.udp_port)
|
peer.address, peer.udp_port)
|
||||||
|
@ -531,9 +552,7 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
except (asyncio.TimeoutError, RemoteException):
|
except (asyncio.TimeoutError, RemoteException):
|
||||||
self.peer_manager.report_failure(peer.address, peer.udp_port)
|
self.peer_manager.report_failure(peer.address, peer.udp_port)
|
||||||
if self.peer_manager.peer_is_good(peer) is False:
|
if self.peer_manager.peer_is_good(peer) is False:
|
||||||
async with self._split_lock:
|
self._to_remove.add(peer)
|
||||||
self.routing_table.remove_peer(peer)
|
|
||||||
self.routing_table.join_buckets()
|
|
||||||
raise
|
raise
|
||||||
|
|
||||||
async def send_response(self, peer: 'KademliaPeer', response: ResponseDatagram):
|
async def send_response(self, peer: 'KademliaPeer', response: ResponseDatagram):
|
||||||
|
|
|
@ -99,6 +99,7 @@ class TestProtocol(AsyncioTestCase):
|
||||||
self.loop, PeerManager(self.loop), node_id, address, udp_port, tcp_port
|
self.loop, PeerManager(self.loop), node_id, address, udp_port, tcp_port
|
||||||
)
|
)
|
||||||
await self.loop.create_datagram_endpoint(lambda: proto, (address, 4444))
|
await self.loop.create_datagram_endpoint(lambda: proto, (address, 4444))
|
||||||
|
proto.start(0.1)
|
||||||
return proto, other_peer.peer_manager.get_kademlia_peer(node_id, address, udp_port=udp_port)
|
return proto, other_peer.peer_manager.get_kademlia_peer(node_id, address, udp_port=udp_port)
|
||||||
|
|
||||||
async def test_add_peer_after_handle_request(self):
|
async def test_add_peer_after_handle_request(self):
|
||||||
|
@ -112,6 +113,7 @@ class TestProtocol(AsyncioTestCase):
|
||||||
self.loop, PeerManager(self.loop), node_id1, '1.2.3.4', 4444, 3333
|
self.loop, PeerManager(self.loop), node_id1, '1.2.3.4', 4444, 3333
|
||||||
)
|
)
|
||||||
await self.loop.create_datagram_endpoint(lambda: peer1, ('1.2.3.4', 4444))
|
await self.loop.create_datagram_endpoint(lambda: peer1, ('1.2.3.4', 4444))
|
||||||
|
peer1.start(0.1)
|
||||||
|
|
||||||
peer2, peer_2_from_peer_1 = await self._make_protocol(peer1, node_id2, '1.2.3.5', 4444, 3333)
|
peer2, peer_2_from_peer_1 = await self._make_protocol(peer1, node_id2, '1.2.3.5', 4444, 3333)
|
||||||
peer3, peer_3_from_peer_1 = await self._make_protocol(peer1, node_id3, '1.2.3.6', 4444, 3333)
|
peer3, peer_3_from_peer_1 = await self._make_protocol(peer1, node_id3, '1.2.3.6', 4444, 3333)
|
||||||
|
@ -119,6 +121,7 @@ class TestProtocol(AsyncioTestCase):
|
||||||
|
|
||||||
# peers who reply should be added
|
# peers who reply should be added
|
||||||
await peer1.get_rpc_peer(peer_2_from_peer_1).ping()
|
await peer1.get_rpc_peer(peer_2_from_peer_1).ping()
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
self.assertListEqual([peer_2_from_peer_1], peer1.routing_table.get_peers())
|
self.assertListEqual([peer_2_from_peer_1], peer1.routing_table.get_peers())
|
||||||
peer1.routing_table.remove_peer(peer_2_from_peer_1)
|
peer1.routing_table.remove_peer(peer_2_from_peer_1)
|
||||||
|
|
||||||
|
@ -137,6 +140,7 @@ class TestProtocol(AsyncioTestCase):
|
||||||
self.assertEqual(0, len(peer1.ping_queue._pending_contacts))
|
self.assertEqual(0, len(peer1.ping_queue._pending_contacts))
|
||||||
pong = await peer1_from_peer4.ping()
|
pong = await peer1_from_peer4.ping()
|
||||||
self.assertEqual(b'pong', pong)
|
self.assertEqual(b'pong', pong)
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
self.assertEqual(1, len(peer1.routing_table.get_peers()))
|
self.assertEqual(1, len(peer1.routing_table.get_peers()))
|
||||||
self.assertEqual(0, len(peer1.ping_queue._pending_contacts))
|
self.assertEqual(0, len(peer1.ping_queue._pending_contacts))
|
||||||
peer1.routing_table.buckets[0].peers.clear()
|
peer1.routing_table.buckets[0].peers.clear()
|
||||||
|
|
|
@ -57,7 +57,7 @@ class TestRouting(AsyncioTestCase):
|
||||||
node.protocol.node_id, node.protocol.external_ip,
|
node.protocol.node_id, node.protocol.external_ip,
|
||||||
udp_port=node.protocol.udp_port
|
udp_port=node.protocol.udp_port
|
||||||
)
|
)
|
||||||
added = await node_1.protocol.add_peer(peer)
|
added = await node_1.protocol._add_peer(peer)
|
||||||
self.assertEqual(True, added)
|
self.assertEqual(True, added)
|
||||||
contact_cnt += 1
|
contact_cnt += 1
|
||||||
|
|
||||||
|
@ -88,7 +88,7 @@ class TestRouting(AsyncioTestCase):
|
||||||
# set all of the peers to good (as to not attempt pinging stale ones during split)
|
# set all of the peers to good (as to not attempt pinging stale ones during split)
|
||||||
node_1.protocol.peer_manager.report_last_replied(peer.address, peer.udp_port)
|
node_1.protocol.peer_manager.report_last_replied(peer.address, peer.udp_port)
|
||||||
node_1.protocol.peer_manager.report_last_replied(peer.address, peer.udp_port)
|
node_1.protocol.peer_manager.report_last_replied(peer.address, peer.udp_port)
|
||||||
await node_1.protocol.add_peer(peer)
|
await node_1.protocol._add_peer(peer)
|
||||||
# check that bucket 0 is always the one covering the local node id
|
# check that bucket 0 is always the one covering the local node id
|
||||||
self.assertEqual(True, node_1.protocol.routing_table.buckets[0].key_in_range(node_1.protocol.node_id))
|
self.assertEqual(True, node_1.protocol.routing_table.buckets[0].key_in_range(node_1.protocol.node_id))
|
||||||
self.assertEqual(40, len(node_1.protocol.routing_table.get_peers()))
|
self.assertEqual(40, len(node_1.protocol.routing_table.get_peers()))
|
||||||
|
|
|
@ -21,6 +21,7 @@ class TestBlobAnnouncer(AsyncioTestCase):
|
||||||
await self.storage.open()
|
await self.storage.open()
|
||||||
self.peer_manager = PeerManager(self.loop)
|
self.peer_manager = PeerManager(self.loop)
|
||||||
self.node = Node(self.loop, self.peer_manager, node_id, 4444, 4444, 3333, address)
|
self.node = Node(self.loop, self.peer_manager, node_id, 4444, 4444, 3333, address)
|
||||||
|
self.node.protocol.start(0.1)
|
||||||
await self.node.start_listening(address)
|
await self.node.start_listening(address)
|
||||||
self.blob_announcer = BlobAnnouncer(self.loop, self.node, self.storage)
|
self.blob_announcer = BlobAnnouncer(self.loop, self.node, self.storage)
|
||||||
for node_id, address in peer_addresses:
|
for node_id, address in peer_addresses:
|
||||||
|
@ -30,9 +31,10 @@ class TestBlobAnnouncer(AsyncioTestCase):
|
||||||
async def add_peer(self, node_id, address, add_to_routing_table=True):
|
async def add_peer(self, node_id, address, add_to_routing_table=True):
|
||||||
n = Node(self.loop, PeerManager(self.loop), node_id, 4444, 4444, 3333, address)
|
n = Node(self.loop, PeerManager(self.loop), node_id, 4444, 4444, 3333, address)
|
||||||
await n.start_listening(address)
|
await n.start_listening(address)
|
||||||
|
n.protocol.start(0.1)
|
||||||
self.nodes.update({len(self.nodes): n})
|
self.nodes.update({len(self.nodes): n})
|
||||||
if add_to_routing_table:
|
if add_to_routing_table:
|
||||||
await self.node.protocol.add_peer(
|
self.node.protocol.add_peer(
|
||||||
self.peer_manager.get_kademlia_peer(
|
self.peer_manager.get_kademlia_peer(
|
||||||
n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port
|
n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port
|
||||||
)
|
)
|
||||||
|
@ -98,6 +100,7 @@ class TestBlobAnnouncer(AsyncioTestCase):
|
||||||
await self.chain_peer(constants.generate_id(12), '1.2.3.12')
|
await self.chain_peer(constants.generate_id(12), '1.2.3.12')
|
||||||
await self.chain_peer(constants.generate_id(13), '1.2.3.13')
|
await self.chain_peer(constants.generate_id(13), '1.2.3.13')
|
||||||
await self.chain_peer(constants.generate_id(14), '1.2.3.14')
|
await self.chain_peer(constants.generate_id(14), '1.2.3.14')
|
||||||
|
await self.advance(61.0)
|
||||||
|
|
||||||
last = self.nodes[len(self.nodes) - 1]
|
last = self.nodes[len(self.nodes) - 1]
|
||||||
search_q, peer_q = asyncio.Queue(loop=self.loop), asyncio.Queue(loop=self.loop)
|
search_q, peer_q = asyncio.Queue(loop=self.loop), asyncio.Queue(loop=self.loop)
|
||||||
|
|
Loading…
Reference in a new issue