lint: finish dht parts
This commit is contained in:
parent
5ed9c5e168
commit
c7f391ca44
7 changed files with 32 additions and 30 deletions
|
@ -38,7 +38,7 @@ class BlobAnnouncer:
|
||||||
continue
|
continue
|
||||||
self.announce_queue.extend(await self.storage.get_blobs_to_announce())
|
self.announce_queue.extend(await self.storage.get_blobs_to_announce())
|
||||||
log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue))
|
log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue))
|
||||||
while len(self.announce_queue):
|
while len(self.announce_queue) > 0:
|
||||||
log.info("%i blobs to announce", len(self.announce_queue))
|
log.info("%i blobs to announce", len(self.announce_queue))
|
||||||
announced = await asyncio.gather(*[
|
announced = await asyncio.gather(*[
|
||||||
self._submit_announcement(
|
self._submit_announcement(
|
||||||
|
|
|
@ -94,8 +94,10 @@ class Node:
|
||||||
)
|
)
|
||||||
stored_to = [node_id for node_id, contacted in stored_to_tup if contacted]
|
stored_to = [node_id for node_id, contacted in stored_to_tup if contacted]
|
||||||
if stored_to:
|
if stored_to:
|
||||||
log.debug("Stored %s to %i of %i attempted peers", binascii.hexlify(hash_value).decode()[:8],
|
log.debug(
|
||||||
len(stored_to), len(peers))
|
"Stored %s to %i of %i attempted peers", binascii.hexlify(hash_value).decode()[:8],
|
||||||
|
len(stored_to), len(peers)
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
log.debug("Failed announcing %s, stored to 0 peers", blob_hash[:8])
|
log.debug("Failed announcing %s, stored to 0 peers", blob_hash[:8])
|
||||||
return stored_to
|
return stored_to
|
||||||
|
@ -252,7 +254,7 @@ class Node:
|
||||||
result_queue.put_nowait(to_put)
|
result_queue.put_nowait(to_put)
|
||||||
|
|
||||||
def accumulate_peers(self, search_queue: asyncio.Queue,
|
def accumulate_peers(self, search_queue: asyncio.Queue,
|
||||||
peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[
|
peer_queue: typing.Optional[asyncio.Queue] = None
|
||||||
asyncio.Queue, asyncio.Task]:
|
) -> typing.Tuple[asyncio.Queue, asyncio.Task]:
|
||||||
q = peer_queue or asyncio.Queue(loop=self.loop)
|
queue = peer_queue or asyncio.Queue(loop=self.loop)
|
||||||
return q, self.loop.create_task(self._accumulate_peers_for_value(search_queue, q))
|
return queue, self.loop.create_task(self._accumulate_peers_for_value(search_queue, queue))
|
||||||
|
|
|
@ -21,8 +21,8 @@ def make_kademlia_peer(node_id: typing.Optional[bytes], address: typing.Optional
|
||||||
|
|
||||||
|
|
||||||
# the ipaddress module does not show these subnets as reserved
|
# the ipaddress module does not show these subnets as reserved
|
||||||
carrier_grade_NAT_subnet = ipaddress.ip_network('100.64.0.0/10')
|
CARRIER_GRADE_NAT_SUBNET = ipaddress.ip_network('100.64.0.0/10')
|
||||||
ip4_to_6_relay_subnet = ipaddress.ip_network('192.88.99.0/24')
|
IPV4_TO_6_RELAY_SUBNET = ipaddress.ip_network('192.88.99.0/24')
|
||||||
|
|
||||||
ALLOW_LOCALHOST = False
|
ALLOW_LOCALHOST = False
|
||||||
|
|
||||||
|
@ -36,8 +36,8 @@ def is_valid_public_ipv4(address, allow_localhost: bool = False):
|
||||||
return not any((parsed_ip.version != 4, parsed_ip.is_unspecified, parsed_ip.is_link_local,
|
return not any((parsed_ip.version != 4, parsed_ip.is_unspecified, parsed_ip.is_link_local,
|
||||||
parsed_ip.is_loopback, parsed_ip.is_multicast, parsed_ip.is_reserved, parsed_ip.is_private,
|
parsed_ip.is_loopback, parsed_ip.is_multicast, parsed_ip.is_reserved, parsed_ip.is_private,
|
||||||
parsed_ip.is_reserved,
|
parsed_ip.is_reserved,
|
||||||
carrier_grade_NAT_subnet.supernet_of(ipaddress.ip_network(f"{address}/32")),
|
CARRIER_GRADE_NAT_SUBNET.supernet_of(ipaddress.ip_network(f"{address}/32")),
|
||||||
ip4_to_6_relay_subnet.supernet_of(ipaddress.ip_network(f"{address}/32"))))
|
IPV4_TO_6_RELAY_SUBNET.supernet_of(ipaddress.ip_network(f"{address}/32"))))
|
||||||
except ipaddress.AddressValueError:
|
except ipaddress.AddressValueError:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -113,13 +113,13 @@ class PeerManager:
|
||||||
while to_pop:
|
while to_pop:
|
||||||
del self._rpc_failures[to_pop.pop()]
|
del self._rpc_failures[to_pop.pop()]
|
||||||
to_pop = []
|
to_pop = []
|
||||||
for node_id, (age, token) in self._node_tokens.items():
|
for node_id, (age, token) in self._node_tokens.items(): # pylint: disable=unused-variable
|
||||||
if age < now - constants.TOKEN_SECRET_REFRESH_INTERVAL:
|
if age < now - constants.TOKEN_SECRET_REFRESH_INTERVAL:
|
||||||
to_pop.append(node_id)
|
to_pop.append(node_id)
|
||||||
while to_pop:
|
while to_pop:
|
||||||
del self._node_tokens[to_pop.pop()]
|
del self._node_tokens[to_pop.pop()]
|
||||||
|
|
||||||
def contact_triple_is_good(self, node_id: bytes, address: str, udp_port: int):
|
def contact_triple_is_good(self, node_id: bytes, address: str, udp_port: int): # pylint: disable=too-many-return-statements
|
||||||
"""
|
"""
|
||||||
:return: False if peer is bad, None if peer is unknown, or True if peer is good
|
:return: False if peer is bad, None if peer is unknown, or True if peer is good
|
||||||
"""
|
"""
|
||||||
|
@ -154,7 +154,7 @@ class PeerManager:
|
||||||
def peer_is_good(self, peer: 'KademliaPeer'):
|
def peer_is_good(self, peer: 'KademliaPeer'):
|
||||||
return self.contact_triple_is_good(peer.node_id, peer.address, peer.udp_port)
|
return self.contact_triple_is_good(peer.node_id, peer.address, peer.udp_port)
|
||||||
|
|
||||||
def decode_tcp_peer_from_compact_address(self, compact_address: bytes) -> 'KademliaPeer':
|
def decode_tcp_peer_from_compact_address(self, compact_address: bytes) -> 'KademliaPeer': # pylint: disable=no-self-use
|
||||||
node_id, address, tcp_port = decode_compact_address(compact_address)
|
node_id, address, tcp_port = decode_compact_address(compact_address)
|
||||||
return make_kademlia_peer(node_id, address, udp_port=None, tcp_port=tcp_port)
|
return make_kademlia_peer(node_id, address, udp_port=None, tcp_port=tcp_port)
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,6 @@ class Distance:
|
||||||
val_key_two = int.from_bytes(key_two, 'big')
|
val_key_two = int.from_bytes(key_two, 'big')
|
||||||
return self.val_key_one ^ val_key_two
|
return self.val_key_one ^ val_key_two
|
||||||
|
|
||||||
def is_closer(self, a: bytes, b: bytes) -> bool:
|
def is_closer(self, key_a: bytes, key_b: bytes) -> bool:
|
||||||
"""Returns true is `a` is closer to `key` than `b` is"""
|
"""Returns true is `key_a` is closer to `key` than `key_b` is"""
|
||||||
return self(a) < self(b)
|
return self(key_a) < self(key_b)
|
||||||
|
|
|
@ -4,13 +4,13 @@ from itertools import chain
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
import typing
|
import typing
|
||||||
import logging
|
import logging
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
from lbry.dht import constants
|
from lbry.dht import constants
|
||||||
from lbry.dht.error import RemoteException, TransportNotConnected
|
from lbry.dht.error import RemoteException, TransportNotConnected
|
||||||
from lbry.dht.protocol.distance import Distance
|
from lbry.dht.protocol.distance import Distance
|
||||||
from lbry.dht.peer import make_kademlia_peer
|
from lbry.dht.peer import make_kademlia_peer
|
||||||
from lbry.dht.serialization.datagram import PAGE_KEY
|
from lbry.dht.serialization.datagram import PAGE_KEY
|
||||||
|
|
||||||
from typing import TYPE_CHECKING
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from lbry.dht.protocol.routing_table import TreeRoutingTable
|
from lbry.dht.protocol.routing_table import TreeRoutingTable
|
||||||
from lbry.dht.protocol.protocol import KademliaProtocol
|
from lbry.dht.protocol.protocol import KademliaProtocol
|
||||||
|
@ -132,7 +132,7 @@ class IterativeFinder:
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
def get_initial_result(self) -> typing.List['KademliaPeer']:
|
def get_initial_result(self) -> typing.List['KademliaPeer']: #pylint: disable=no-self-use
|
||||||
"""
|
"""
|
||||||
Get an initial or cached result to be put into the Queue. Used for findValue requests where the blob
|
Get an initial or cached result to be put into the Queue. Used for findValue requests where the blob
|
||||||
has peers in the local data store of blobs announced to us
|
has peers in the local data store of blobs announced to us
|
||||||
|
@ -282,8 +282,8 @@ class IterativeNodeFinder(IterativeFinder):
|
||||||
not_yet_yielded = [
|
not_yet_yielded = [
|
||||||
peer for peer in from_iter
|
peer for peer in from_iter
|
||||||
if peer not in self.yielded_peers
|
if peer not in self.yielded_peers
|
||||||
and peer.node_id != self.protocol.node_id
|
and peer.node_id != self.protocol.node_id
|
||||||
and self.peer_manager.peer_is_good(peer) is not False
|
and self.peer_manager.peer_is_good(peer) is not False
|
||||||
]
|
]
|
||||||
not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id))
|
not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id))
|
||||||
to_yield = not_yet_yielded[:min(constants.K, len(not_yet_yielded))]
|
to_yield = not_yet_yielded[:min(constants.K, len(not_yet_yielded))]
|
||||||
|
|
|
@ -51,9 +51,9 @@ class KBucket:
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
for i in range(len(self.peers)):
|
for i in range(len(self.peers)):
|
||||||
p = self.peers[i]
|
local_peer = self.peers[i]
|
||||||
if p.node_id == peer.node_id:
|
if local_peer.node_id == peer.node_id:
|
||||||
self.peers.remove(p)
|
self.peers.remove(local_peer)
|
||||||
self.peers.append(peer)
|
self.peers.append(peer)
|
||||||
return True
|
return True
|
||||||
if len(self.peers) < constants.K:
|
if len(self.peers) < constants.K:
|
||||||
|
@ -283,7 +283,7 @@ class TreeRoutingTable:
|
||||||
def join_buckets(self):
|
def join_buckets(self):
|
||||||
if len(self.buckets) == 1:
|
if len(self.buckets) == 1:
|
||||||
return
|
return
|
||||||
to_pop = [i for i, bucket in enumerate(self.buckets) if not len(bucket)]
|
to_pop = [i for i, bucket in enumerate(self.buckets) if len(bucket) == 0]
|
||||||
if not to_pop:
|
if not to_pop:
|
||||||
return
|
return
|
||||||
log.info("join buckets %i", len(to_pop))
|
log.info("join buckets %i", len(to_pop))
|
||||||
|
@ -314,6 +314,6 @@ class TreeRoutingTable:
|
||||||
def buckets_with_contacts(self) -> int:
|
def buckets_with_contacts(self) -> int:
|
||||||
count = 0
|
count = 0
|
||||||
for bucket in self.buckets:
|
for bucket in self.buckets:
|
||||||
if len(bucket):
|
if len(bucket) > 0:
|
||||||
count += 1
|
count += 1
|
||||||
return count
|
return count
|
||||||
|
|
|
@ -46,9 +46,9 @@ class KademliaDatagramBase:
|
||||||
i: getattr(self, k) for i, k in enumerate(self.required_fields)
|
i: getattr(self, k) for i, k in enumerate(self.required_fields)
|
||||||
}
|
}
|
||||||
for i, k in enumerate(OPTIONAL_FIELDS):
|
for i, k in enumerate(OPTIONAL_FIELDS):
|
||||||
v = getattr(self, k, None)
|
value = getattr(self, k, None)
|
||||||
if v is not None:
|
if value is not None:
|
||||||
datagram[i + OPTIONAL_ARG_OFFSET] = v
|
datagram[i + OPTIONAL_ARG_OFFSET] = value
|
||||||
return bencode(datagram)
|
return bencode(datagram)
|
||||||
|
|
||||||
|
|
||||||
|
@ -162,7 +162,7 @@ def decode_datagram(datagram: bytes) -> typing.Union[RequestDatagram, ResponseDa
|
||||||
for i, k in enumerate(datagram_class.required_fields)
|
for i, k in enumerate(datagram_class.required_fields)
|
||||||
if i in primitive # pylint: disable=unsupported-membership-test
|
if i in primitive # pylint: disable=unsupported-membership-test
|
||||||
}
|
}
|
||||||
for i, k in enumerate(OPTIONAL_FIELDS):
|
for i, _ in enumerate(OPTIONAL_FIELDS):
|
||||||
if i + OPTIONAL_ARG_OFFSET in primitive:
|
if i + OPTIONAL_ARG_OFFSET in primitive:
|
||||||
decoded[i + OPTIONAL_ARG_OFFSET] = primitive[i + OPTIONAL_ARG_OFFSET]
|
decoded[i + OPTIONAL_ARG_OFFSET] = primitive[i + OPTIONAL_ARG_OFFSET]
|
||||||
return datagram_class(**decoded)
|
return datagram_class(**decoded)
|
||||||
|
|
Loading…
Reference in a new issue