dht constants -> CONSTANTS (linting)

This commit is contained in:
Victor Shyba 2020-01-03 00:57:28 -03:00 committed by Lex Berezhny
parent cbc6d6a572
commit 10fbce056b
13 changed files with 106 additions and 106 deletions

View file

@ -511,7 +511,7 @@ class Config(CLIConfig):
download_timeout = Float("Cumulative timeout for a stream to begin downloading before giving up", 30.0) download_timeout = Float("Cumulative timeout for a stream to begin downloading before giving up", 30.0)
blob_download_timeout = Float("Timeout to download a blob from a peer", 30.0) blob_download_timeout = Float("Timeout to download a blob from a peer", 30.0)
peer_connect_timeout = Float("Timeout to establish a TCP connection to a peer", 3.0) peer_connect_timeout = Float("Timeout to establish a TCP connection to a peer", 3.0)
node_rpc_timeout = Float("Timeout when making a DHT request", constants.rpc_timeout) node_rpc_timeout = Float("Timeout when making a DHT request", constants.RPC_TIMEOUT)
# blob announcement and download # blob announcement and download
save_blobs = Toggle("Save encrypted blob files for hosting, otherwise download blobs to memory only.", True) save_blobs = Toggle("Save encrypted blob files for hosting, otherwise download blobs to memory only.", True)

View file

@ -1,31 +1,31 @@
import hashlib import hashlib
import os import os
hash_class = hashlib.sha384 HASH_CLASS = hashlib.sha384 # pylint: disable=invalid-name
hash_length = hash_class().digest_size HASH_LENGTH = HASH_CLASS().digest_size
hash_bits = hash_length * 8 HASH_BITS = HASH_LENGTH * 8
alpha = 5 ALPHA = 5
k = 8 K = 8
split_buckets_under_index = 1 SPLIT_BUCKETS_UNDER_INDEX = 1
replacement_cache_size = 8 REPLACEMENT_CACHE_SIZE = 8
rpc_timeout = 5.0 RPC_TIMEOUT = 5.0
rpc_attempts = 5 RPC_ATTEMPTS = 5
rpc_attempts_pruning_window = 600 RPC_ATTEMPTS_PRUNING_WINDOW = 600
iterative_lookup_delay = rpc_timeout / 2.0 # TODO: use config val / 2 if rpc timeout is provided ITERATIVE_LOOKUP_DELAY = RPC_TIMEOUT / 2.0 # TODO: use config val / 2 if rpc timeout is provided
refresh_interval = 3600 # 1 hour REFRESH_INTERVAL = 3600 # 1 hour
replicate_interval = refresh_interval REPLICATE_INTERVAL = REFRESH_INTERVAL
data_expiration = 86400 # 24 hours DATA_EXPIRATION = 86400 # 24 hours
token_secret_refresh_interval = 300 # 5 minutes TOKEN_SECRET_REFRESH_INTERVAL = 300 # 5 minutes
maybe_ping_delay = 300 # 5 minutes MAYBE_PING_DELAY = 300 # 5 minutes
check_refresh_interval = refresh_interval / 5 CHECK_REFRESH_INTERVAL = REFRESH_INTERVAL / 5
rpc_id_length = 20 RPC_ID_LENGTH = 20
protocol_version = 1 PROTOCOL_VERSION = 1
bottom_out_limit = 3 BOTTOM_OUT_LIMIT = 3
msg_size_limit = 1400 MSG_SIZE_LIMIT = 1400
def digest(data: bytes) -> bytes: def digest(data: bytes) -> bytes:
h = hash_class() h = HASH_CLASS()
h.update(data) h.update(data)
return h.digest() return h.digest()
@ -38,4 +38,4 @@ def generate_id(num=None) -> bytes:
def generate_rpc_id(num=None) -> bytes: def generate_rpc_id(num=None) -> bytes:
return generate_id(num)[:rpc_id_length] return generate_id(num)[:RPC_ID_LENGTH]

View file

@ -19,8 +19,8 @@ log = logging.getLogger(__name__)
class Node: class Node:
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int, def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int,
internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.rpc_timeout, internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.RPC_TIMEOUT,
split_buckets_under_index: int = constants.split_buckets_under_index, split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX,
storage: typing.Optional['SQLiteStorage'] = None): storage: typing.Optional['SQLiteStorage'] = None):
self.loop = loop self.loop = loop
self.internal_udp_port = internal_udp_port self.internal_udp_port = internal_udp_port
@ -62,7 +62,7 @@ class Node:
if force_once: if force_once:
break break
fut = asyncio.Future(loop=self.loop) fut = asyncio.Future(loop=self.loop)
self.loop.call_later(constants.refresh_interval // 4, fut.set_result, None) self.loop.call_later(constants.REFRESH_INTERVAL // 4, fut.set_result, None)
await fut await fut
continue continue
@ -76,12 +76,12 @@ class Node:
break break
fut = asyncio.Future(loop=self.loop) fut = asyncio.Future(loop=self.loop)
self.loop.call_later(constants.refresh_interval, fut.set_result, None) self.loop.call_later(constants.REFRESH_INTERVAL, fut.set_result, None)
await fut await fut
async def announce_blob(self, blob_hash: str) -> typing.List[bytes]: async def announce_blob(self, blob_hash: str) -> typing.List[bytes]:
hash_value = binascii.unhexlify(blob_hash.encode()) hash_value = binascii.unhexlify(blob_hash.encode())
assert len(hash_value) == constants.hash_length assert len(hash_value) == constants.HASH_LENGTH
peers = await self.peer_search(hash_value) peers = await self.peer_search(hash_value)
if not self.protocol.external_ip: if not self.protocol.external_ip:
@ -175,8 +175,8 @@ class Node:
self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls)) self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls))
def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None, def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
bottom_out_limit: int = constants.bottom_out_limit, bottom_out_limit: int = constants.BOTTOM_OUT_LIMIT,
max_results: int = constants.k) -> IterativeNodeFinder: max_results: int = constants.K) -> IterativeNodeFinder:
return IterativeNodeFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol, return IterativeNodeFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
key, bottom_out_limit, max_results, None, shortlist) key, bottom_out_limit, max_results, None, shortlist)
@ -188,7 +188,7 @@ class Node:
return IterativeValueFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol, return IterativeValueFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
key, bottom_out_limit, max_results, None, shortlist) key, bottom_out_limit, max_results, None, shortlist)
async def peer_search(self, node_id: bytes, count=constants.k, max_results=constants.k*2, async def peer_search(self, node_id: bytes, count=constants.K, max_results=constants.K * 2,
bottom_out_limit=20, shortlist: typing.Optional[typing.List['KademliaPeer']] = None bottom_out_limit=20, shortlist: typing.Optional[typing.List['KademliaPeer']] = None
) -> typing.List['KademliaPeer']: ) -> typing.List['KademliaPeer']:
peers = [] peers = []

View file

@ -85,7 +85,7 @@ class PeerManager:
def get_node_token(self, node_id: bytes) -> typing.Optional[bytes]: def get_node_token(self, node_id: bytes) -> typing.Optional[bytes]:
ts, token = self._node_tokens.get(node_id, (0, None)) ts, token = self._node_tokens.get(node_id, (0, None))
if ts and ts > self._loop.time() - constants.token_secret_refresh_interval: if ts and ts > self._loop.time() - constants.TOKEN_SECRET_REFRESH_INTERVAL:
return token return token
def get_last_replied(self, address: str, udp_port: int) -> typing.Optional[float]: def get_last_replied(self, address: str, udp_port: int) -> typing.Optional[float]:
@ -108,13 +108,13 @@ class PeerManager:
now = self._loop.time() now = self._loop.time()
to_pop = [] to_pop = []
for (address, udp_port), (_, last_failure) in self._rpc_failures.items(): for (address, udp_port), (_, last_failure) in self._rpc_failures.items():
if last_failure and last_failure < now - constants.rpc_attempts_pruning_window: if last_failure and last_failure < now - constants.RPC_ATTEMPTS_PRUNING_WINDOW:
to_pop.append((address, udp_port)) to_pop.append((address, udp_port))
while to_pop: while to_pop:
del self._rpc_failures[to_pop.pop()] del self._rpc_failures[to_pop.pop()]
to_pop = [] to_pop = []
for node_id, (age, token) in self._node_tokens.items(): for node_id, (age, token) in self._node_tokens.items():
if age < now - constants.token_secret_refresh_interval: if age < now - constants.TOKEN_SECRET_REFRESH_INTERVAL:
to_pop.append(node_id) to_pop.append(node_id)
while to_pop: while to_pop:
del self._node_tokens[to_pop.pop()] del self._node_tokens[to_pop.pop()]
@ -124,7 +124,7 @@ class PeerManager:
:return: False if peer is bad, None if peer is unknown, or True if peer is good :return: False if peer is bad, None if peer is unknown, or True if peer is good
""" """
delay = self._loop.time() - constants.check_refresh_interval delay = self._loop.time() - constants.CHECK_REFRESH_INTERVAL
# fixme: find a way to re-enable that without breaking other parts # fixme: find a way to re-enable that without breaking other parts
# if node_id not in self._node_id_reverse_mapping or (address, udp_port) not in self._node_id_mapping: # if node_id not in self._node_id_reverse_mapping or (address, udp_port) not in self._node_id_mapping:
@ -170,7 +170,7 @@ class KademliaPeer:
def __post_init__(self): def __post_init__(self):
if self._node_id is not None: if self._node_id is not None:
if not len(self._node_id) == constants.hash_length: if not len(self._node_id) == constants.HASH_LENGTH:
raise ValueError("invalid node_id: {}".format(hexlify(self._node_id).decode())) raise ValueError("invalid node_id: {}".format(hexlify(self._node_id).decode()))
if self.udp_port is not None and not 1 <= self.udp_port <= 65535: if self.udp_port is not None and not 1 <= self.udp_port <= 65535:
raise ValueError("invalid udp port") raise ValueError("invalid udp port")

View file

@ -22,7 +22,7 @@ class DictDataStore:
for key in keys: for key in keys:
to_remove = [] to_remove = []
for (peer, ts) in self._data_store[key]: for (peer, ts) in self._data_store[key]:
if ts + constants.data_expiration < now or self._peer_manager.peer_is_good(peer) is False: if ts + constants.DATA_EXPIRATION < now or self._peer_manager.peer_is_good(peer) is False:
to_remove.append((peer, ts)) to_remove.append((peer, ts))
for item in to_remove: for item in to_remove:
self._data_store[key].remove(item) self._data_store[key].remove(item)
@ -43,7 +43,7 @@ class DictDataStore:
""" """
now = self.loop.time() now = self.loop.time()
for (peer, ts) in self._data_store.get(key, []): for (peer, ts) in self._data_store.get(key, []):
if ts + constants.data_expiration > now: if ts + constants.DATA_EXPIRATION > now:
yield peer yield peer
def has_peers_for_blob(self, key: bytes) -> bool: def has_peers_for_blob(self, key: bytes) -> bool:

View file

@ -9,13 +9,13 @@ class Distance:
""" """
def __init__(self, key: bytes): def __init__(self, key: bytes):
if len(key) != constants.hash_length: if len(key) != constants.HASH_LENGTH:
raise ValueError(f"invalid key length: {len(key)}") raise ValueError(f"invalid key length: {len(key)}")
self.key = key self.key = key
self.val_key_one = int.from_bytes(key, 'big') self.val_key_one = int.from_bytes(key, 'big')
def __call__(self, key_two: bytes) -> int: def __call__(self, key_two: bytes) -> int:
if len(key_two) != constants.hash_length: if len(key_two) != constants.HASH_LENGTH:
raise ValueError(f"invalid length of key to compare: {len(key_two)}") raise ValueError(f"invalid length of key to compare: {len(key_two)}")
val_key_two = int.from_bytes(key_two, 'big') val_key_two = int.from_bytes(key_two, 'big')
return self.val_key_one ^ val_key_two return self.val_key_one ^ val_key_two

View file

@ -67,7 +67,7 @@ def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes,
:param shortlist: optional manually provided shortlist, this is done during bootstrapping when there are no :param shortlist: optional manually provided shortlist, this is done during bootstrapping when there are no
peers in the routing table. During bootstrap the shortlist is set to be the seed nodes. peers in the routing table. During bootstrap the shortlist is set to be the seed nodes.
""" """
if len(key) != constants.hash_length: if len(key) != constants.HASH_LENGTH:
raise ValueError("invalid key length: %i" % len(key)) raise ValueError("invalid key length: %i" % len(key))
return shortlist or routing_table.find_close_peers(key) return shortlist or routing_table.find_close_peers(key)
@ -75,10 +75,10 @@ def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes,
class IterativeFinder: class IterativeFinder:
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.k, bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None): shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
if len(key) != constants.hash_length: if len(key) != constants.HASH_LENGTH:
raise ValueError("invalid key length: %i" % len(key)) raise ValueError("invalid key length: %i" % len(key))
self.loop = loop self.loop = loop
self.peer_manager = peer_manager self.peer_manager = peer_manager
@ -185,7 +185,7 @@ class IterativeFinder:
to_probe = list(self.active - self.contacted) to_probe = list(self.active - self.contacted)
to_probe.sort(key=lambda peer: self.distance(self.key)) to_probe.sort(key=lambda peer: self.distance(self.key))
for peer in to_probe: for peer in to_probe:
if added >= constants.alpha: if added >= constants.ALPHA:
break break
origin_address = (peer.address, peer.udp_port) origin_address = (peer.address, peer.udp_port)
if origin_address in self.exclude: if origin_address in self.exclude:
@ -216,7 +216,7 @@ class IterativeFinder:
t.add_done_callback(callback) t.add_done_callback(callback)
self.running_probes.add(t) self.running_probes.add(t)
async def _search_task(self, delay: typing.Optional[float] = constants.iterative_lookup_delay): async def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY):
try: try:
if self.running: if self.running:
await self._search_round() await self._search_round()
@ -263,7 +263,7 @@ class IterativeFinder:
class IterativeNodeFinder(IterativeFinder): class IterativeNodeFinder(IterativeFinder):
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.k, bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None): shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude, super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude,
@ -286,7 +286,7 @@ class IterativeNodeFinder(IterativeFinder):
and self.peer_manager.peer_is_good(peer) is not False and self.peer_manager.peer_is_good(peer) is not False
] ]
not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id)) not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id))
to_yield = not_yet_yielded[:min(constants.k, len(not_yet_yielded))] to_yield = not_yet_yielded[:min(constants.K, len(not_yet_yielded))]
if to_yield: if to_yield:
self.yielded_peers.update(to_yield) self.yielded_peers.update(to_yield)
self.iteration_queue.put_nowait(to_yield) self.iteration_queue.put_nowait(to_yield)
@ -314,7 +314,7 @@ class IterativeNodeFinder(IterativeFinder):
class IterativeValueFinder(IterativeFinder): class IterativeValueFinder(IterativeFinder):
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.k, bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None): shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude, super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude,
@ -348,7 +348,7 @@ class IterativeValueFinder(IterativeFinder):
if len(self.discovered_peers[peer]) != already_known + len(parsed.found_compact_addresses): if len(self.discovered_peers[peer]) != already_known + len(parsed.found_compact_addresses):
log.warning("misbehaving peer %s:%i returned duplicate peers for blob", peer.address, peer.udp_port) log.warning("misbehaving peer %s:%i returned duplicate peers for blob", peer.address, peer.udp_port)
parsed.found_compact_addresses.clear() parsed.found_compact_addresses.clear()
elif len(parsed.found_compact_addresses) >= constants.k and self.peer_pages[peer] < parsed.pages: elif len(parsed.found_compact_addresses) >= constants.K and self.peer_pages[peer] < parsed.pages:
# the peer returned a full page and indicates it has more # the peer returned a full page and indicates it has more
self.peer_pages[peer] += 1 self.peer_pages[peer] += 1
if peer in self.contacted: if peer in self.contacted:

View file

@ -48,13 +48,13 @@ class KademliaRPC:
return b'pong' return b'pong'
def store(self, rpc_contact: 'KademliaPeer', blob_hash: bytes, token: bytes, port: int) -> bytes: def store(self, rpc_contact: 'KademliaPeer', blob_hash: bytes, token: bytes, port: int) -> bytes:
if len(blob_hash) != constants.hash_bits // 8: if len(blob_hash) != constants.HASH_BITS // 8:
raise ValueError(f"invalid length of blob hash: {len(blob_hash)}") raise ValueError(f"invalid length of blob hash: {len(blob_hash)}")
if not 0 < port < 65535: if not 0 < port < 65535:
raise ValueError(f"invalid tcp port: {port}") raise ValueError(f"invalid tcp port: {port}")
rpc_contact.update_tcp_port(port) rpc_contact.update_tcp_port(port)
if not self.verify_token(token, rpc_contact.compact_ip()): if not self.verify_token(token, rpc_contact.compact_ip()):
if self.loop.time() - self.protocol.started_listening_time < constants.token_secret_refresh_interval: if self.loop.time() - self.protocol.started_listening_time < constants.TOKEN_SECRET_REFRESH_INTERVAL:
pass pass
else: else:
raise ValueError("Invalid token") raise ValueError("Invalid token")
@ -64,19 +64,19 @@ class KademliaRPC:
return b'OK' return b'OK'
def find_node(self, rpc_contact: 'KademliaPeer', key: bytes) -> typing.List[typing.Tuple[bytes, str, int]]: def find_node(self, rpc_contact: 'KademliaPeer', key: bytes) -> typing.List[typing.Tuple[bytes, str, int]]:
if len(key) != constants.hash_length: if len(key) != constants.HASH_LENGTH:
raise ValueError("invalid contact node_id length: %i" % len(key)) raise ValueError("invalid contact node_id length: %i" % len(key))
contacts = self.protocol.routing_table.find_close_peers(key, sender_node_id=rpc_contact.node_id) contacts = self.protocol.routing_table.find_close_peers(key, sender_node_id=rpc_contact.node_id)
contact_triples = [] contact_triples = []
for contact in contacts[:constants.k * 2]: for contact in contacts[:constants.K * 2]:
contact_triples.append((contact.node_id, contact.address, contact.udp_port)) contact_triples.append((contact.node_id, contact.address, contact.udp_port))
return contact_triples return contact_triples
def find_value(self, rpc_contact: 'KademliaPeer', key: bytes, page: int = 0): def find_value(self, rpc_contact: 'KademliaPeer', key: bytes, page: int = 0):
page = page if page > 0 else 0 page = page if page > 0 else 0
if len(key) != constants.hash_length: if len(key) != constants.HASH_LENGTH:
raise ValueError("invalid blob_exchange hash length: %i" % len(key)) raise ValueError("invalid blob_exchange hash length: %i" % len(key))
response = { response = {
@ -84,7 +84,7 @@ class KademliaRPC:
} }
if not page: if not page:
response[b'contacts'] = self.find_node(rpc_contact, key)[:constants.k] response[b'contacts'] = self.find_node(rpc_contact, key)[:constants.K]
if self.protocol.protocol_version: if self.protocol.protocol_version:
response[b'protocolVersion'] = self.protocol.protocol_version response[b'protocolVersion'] = self.protocol.protocol_version
@ -96,16 +96,16 @@ class KademliaRPC:
if not rpc_contact.tcp_port or peer.compact_address_tcp() != rpc_contact.compact_address_tcp() if not rpc_contact.tcp_port or peer.compact_address_tcp() != rpc_contact.compact_address_tcp()
] ]
# if we don't have k storing peers to return and we have this hash locally, include our contact information # if we don't have k storing peers to return and we have this hash locally, include our contact information
if len(peers) < constants.k and binascii.hexlify(key).decode() in self.protocol.data_store.completed_blobs: if len(peers) < constants.K and binascii.hexlify(key).decode() in self.protocol.data_store.completed_blobs:
peers.append(self.compact_address()) peers.append(self.compact_address())
if not peers: if not peers:
response[PAGE_KEY] = 0 response[PAGE_KEY] = 0
else: else:
response[PAGE_KEY] = (len(peers) // (constants.k + 1)) + 1 # how many pages of peers we have for the blob response[PAGE_KEY] = (len(peers) // (constants.K + 1)) + 1 # how many pages of peers we have for the blob
if len(peers) > constants.k: if len(peers) > constants.K:
random.Random(self.protocol.node_id).shuffle(peers) random.Random(self.protocol.node_id).shuffle(peers)
if page * constants.k < len(peers): if page * constants.K < len(peers):
response[key] = peers[page * constants.k:page * constants.k + constants.k] response[key] = peers[page * constants.K:page * constants.K + constants.K]
return response return response
def refresh_token(self): # TODO: this needs to be called periodically def refresh_token(self): # TODO: this needs to be called periodically
@ -154,7 +154,7 @@ class RemoteKademliaRPC:
:param blob_hash: blob hash as bytes :param blob_hash: blob hash as bytes
:return: b'OK' :return: b'OK'
""" """
if len(blob_hash) != constants.hash_bits // 8: if len(blob_hash) != constants.HASH_BITS // 8:
raise ValueError(f"invalid length of blob hash: {len(blob_hash)}") raise ValueError(f"invalid length of blob hash: {len(blob_hash)}")
if not self.protocol.peer_port or not 0 < self.protocol.peer_port < 65535: if not self.protocol.peer_port or not 0 < self.protocol.peer_port < 65535:
raise ValueError(f"invalid tcp port: {self.protocol.peer_port}") raise ValueError(f"invalid tcp port: {self.protocol.peer_port}")
@ -171,7 +171,7 @@ class RemoteKademliaRPC:
""" """
:return: [(node_id, address, udp_port), ...] :return: [(node_id, address, udp_port), ...]
""" """
if len(key) != constants.hash_bits // 8: if len(key) != constants.HASH_BITS // 8:
raise ValueError(f"invalid length of find node key: {len(key)}") raise ValueError(f"invalid length of find node key: {len(key)}")
response = await self.protocol.send_request( response = await self.protocol.send_request(
self.peer, RequestDatagram.make_find_node(self.protocol.node_id, key) self.peer, RequestDatagram.make_find_node(self.protocol.node_id, key)
@ -186,7 +186,7 @@ class RemoteKademliaRPC:
<key bytes>: [<blob_peer_compact_address, ...] <key bytes>: [<blob_peer_compact_address, ...]
} }
""" """
if len(key) != constants.hash_bits // 8: if len(key) != constants.HASH_BITS // 8:
raise ValueError(f"invalid length of find value key: {len(key)}") raise ValueError(f"invalid length of find value key: {len(key)}")
response = await self.protocol.send_request( response = await self.protocol.send_request(
self.peer, RequestDatagram.make_find_value(self.protocol.node_id, key, page=page) self.peer, RequestDatagram.make_find_value(self.protocol.node_id, key, page=page)
@ -203,7 +203,7 @@ class PingQueue:
self._process_task: asyncio.Task = None self._process_task: asyncio.Task = None
self._running = False self._running = False
self._running_pings: typing.Set[asyncio.Task] = set() self._running_pings: typing.Set[asyncio.Task] = set()
self._default_delay = constants.maybe_ping_delay self._default_delay = constants.MAYBE_PING_DELAY
@property @property
def running(self): def running(self):
@ -260,8 +260,8 @@ class PingQueue:
class KademliaProtocol(DatagramProtocol): class KademliaProtocol(DatagramProtocol):
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str, def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str,
udp_port: int, peer_port: int, rpc_timeout: float = constants.rpc_timeout, udp_port: int, peer_port: int, rpc_timeout: float = constants.RPC_TIMEOUT,
split_buckets_under_index: int = constants.split_buckets_under_index): split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX):
self.peer_manager = peer_manager self.peer_manager = peer_manager
self.loop = loop self.loop = loop
self.node_id = node_id self.node_id = node_id
@ -271,7 +271,7 @@ class KademliaProtocol(DatagramProtocol):
self.is_seed_node = False self.is_seed_node = False
self.partial_messages: typing.Dict[bytes, typing.Dict[bytes, bytes]] = {} self.partial_messages: typing.Dict[bytes, typing.Dict[bytes, bytes]] = {}
self.sent_messages: typing.Dict[bytes, typing.Tuple['KademliaPeer', asyncio.Future, RequestDatagram]] = {} self.sent_messages: typing.Dict[bytes, typing.Tuple['KademliaPeer', asyncio.Future, RequestDatagram]] = {}
self.protocol_version = constants.protocol_version self.protocol_version = constants.PROTOCOL_VERSION
self.started_listening_time = 0 self.started_listening_time = 0
self.transport: DatagramTransport = None self.transport: DatagramTransport = None
self.old_token_secret = constants.generate_id() self.old_token_secret = constants.generate_id()
@ -589,12 +589,12 @@ class KademliaProtocol(DatagramProtocol):
raise TransportNotConnected() raise TransportNotConnected()
data = message.bencode() data = message.bencode()
if len(data) > constants.msg_size_limit: if len(data) > constants.MSG_SIZE_LIMIT:
log.warning("cannot send datagram larger than %i bytes (packet is %i bytes)", log.warning("cannot send datagram larger than %i bytes (packet is %i bytes)",
constants.msg_size_limit, len(data)) constants.MSG_SIZE_LIMIT, len(data))
log.debug("Packet is too large to send: %s", binascii.hexlify(data[:3500]).decode()) log.debug("Packet is too large to send: %s", binascii.hexlify(data[:3500]).decode())
raise ValueError( raise ValueError(
f"cannot send datagram larger than {constants.msg_size_limit} bytes (packet is {len(data)} bytes)" f"cannot send datagram larger than {constants.MSG_SIZE_LIMIT} bytes (packet is {len(data)} bytes)"
) )
if isinstance(message, (RequestDatagram, ResponseDatagram)): if isinstance(message, (RequestDatagram, ResponseDatagram)):
assert message.node_id == self.node_id, message assert message.node_id == self.node_id, message
@ -637,10 +637,10 @@ class KademliaProtocol(DatagramProtocol):
return constants.digest(self.token_secret + compact_ip) return constants.digest(self.token_secret + compact_ip)
def verify_token(self, token, compact_ip): def verify_token(self, token, compact_ip):
h = constants.hash_class() h = constants.HASH_CLASS()
h.update(self.token_secret + compact_ip) h.update(self.token_secret + compact_ip)
if self.old_token_secret and not token == h.digest(): # TODO: why should we be accepting the previous token? if self.old_token_secret and not token == h.digest(): # TODO: why should we be accepting the previous token?
h = constants.hash_class() h = constants.HASH_CLASS()
h.update(self.old_token_secret + compact_ip) h.update(self.old_token_secret + compact_ip)
if not token == h.digest(): if not token == h.digest():
return False return False

View file

@ -56,7 +56,7 @@ class KBucket:
self.peers.remove(p) self.peers.remove(p)
self.peers.append(peer) self.peers.append(peer)
return True return True
if len(self.peers) < constants.k: if len(self.peers) < constants.K:
self.peers.append(peer) self.peers.append(peer)
return True return True
else: else:
@ -101,8 +101,8 @@ class KBucket:
current_len = len(peers) current_len = len(peers)
# If count greater than k - return only k contacts # If count greater than k - return only k contacts
if count > constants.k: if count > constants.K:
count = constants.k count = constants.K
if not current_len: if not current_len:
return peers return peers
@ -164,14 +164,14 @@ class TreeRoutingTable:
""" """
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', parent_node_id: bytes, def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', parent_node_id: bytes,
split_buckets_under_index: int = constants.split_buckets_under_index): split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX):
self._loop = loop self._loop = loop
self._peer_manager = peer_manager self._peer_manager = peer_manager
self._parent_node_id = parent_node_id self._parent_node_id = parent_node_id
self._split_buckets_under_index = split_buckets_under_index self._split_buckets_under_index = split_buckets_under_index
self.buckets: typing.List[KBucket] = [ self.buckets: typing.List[KBucket] = [
KBucket( KBucket(
self._peer_manager, range_min=0, range_max=2 ** constants.hash_bits, node_id=self._parent_node_id self._peer_manager, range_min=0, range_max=2 ** constants.HASH_BITS, node_id=self._parent_node_id
) )
] ]
@ -185,7 +185,7 @@ class TreeRoutingTable:
contacts = self.get_peers() contacts = self.get_peers()
distance = Distance(self._parent_node_id) distance = Distance(self._parent_node_id)
contacts.sort(key=lambda c: distance(c.node_id)) contacts.sort(key=lambda c: distance(c.node_id))
kth_contact = contacts[-1] if len(contacts) < constants.k else contacts[constants.k - 1] kth_contact = contacts[-1] if len(contacts) < constants.K else contacts[constants.K - 1]
return distance(to_add) < distance(kth_contact.node_id) return distance(to_add) < distance(kth_contact.node_id)
def find_close_peers(self, key: bytes, count: typing.Optional[int] = None, def find_close_peers(self, key: bytes, count: typing.Optional[int] = None,
@ -193,7 +193,7 @@ class TreeRoutingTable:
exclude = [self._parent_node_id] exclude = [self._parent_node_id]
if sender_node_id: if sender_node_id:
exclude.append(sender_node_id) exclude.append(sender_node_id)
count = count or constants.k count = count or constants.K
distance = Distance(key) distance = Distance(key)
contacts = self.get_peers() contacts = self.get_peers()
contacts = [c for c in contacts if c.node_id not in exclude] contacts = [c for c in contacts if c.node_id not in exclude]
@ -214,7 +214,7 @@ class TreeRoutingTable:
refresh_ids = [] refresh_ids = []
now = int(self._loop.time()) now = int(self._loop.time())
for bucket in self.buckets[start_index:]: for bucket in self.buckets[start_index:]:
if force or now - bucket.last_accessed >= constants.refresh_interval: if force or now - bucket.last_accessed >= constants.REFRESH_INTERVAL:
to_search = self.midpoint_id_in_bucket_range(bucket_index) to_search = self.midpoint_id_in_bucket_range(bucket_index)
refresh_ids.append(to_search) refresh_ids.append(to_search)
bucket_index += 1 bucket_index += 1
@ -248,13 +248,13 @@ class TreeRoutingTable:
random_id = int(random.randrange(self.buckets[bucket_index].range_min, self.buckets[bucket_index].range_max)) random_id = int(random.randrange(self.buckets[bucket_index].range_min, self.buckets[bucket_index].range_max))
return Distance( return Distance(
self._parent_node_id self._parent_node_id
)(random_id.to_bytes(constants.hash_length, 'big')).to_bytes(constants.hash_length, 'big') )(random_id.to_bytes(constants.HASH_LENGTH, 'big')).to_bytes(constants.HASH_LENGTH, 'big')
def midpoint_id_in_bucket_range(self, bucket_index: int) -> bytes: def midpoint_id_in_bucket_range(self, bucket_index: int) -> bytes:
half = int((self.buckets[bucket_index].range_max - self.buckets[bucket_index].range_min) // 2) half = int((self.buckets[bucket_index].range_max - self.buckets[bucket_index].range_min) // 2)
return Distance(self._parent_node_id)( return Distance(self._parent_node_id)(
int(self.buckets[bucket_index].range_min + half).to_bytes(constants.hash_length, 'big') int(self.buckets[bucket_index].range_min + half).to_bytes(constants.HASH_LENGTH, 'big')
).to_bytes(constants.hash_length, 'big') ).to_bytes(constants.HASH_LENGTH, 'big')
def split_bucket(self, old_bucket_index: int) -> None: def split_bucket(self, old_bucket_index: int) -> None:
""" Splits the specified k-bucket into two new buckets which together """ Splits the specified k-bucket into two new buckets which together

View file

@ -34,9 +34,9 @@ class KademliaDatagramBase:
self.packet_type = packet_type self.packet_type = packet_type
if self.expected_packet_type != packet_type: if self.expected_packet_type != packet_type:
raise ValueError(f"invalid packet type: {packet_type}, expected {self.expected_packet_type}") raise ValueError(f"invalid packet type: {packet_type}, expected {self.expected_packet_type}")
if len(rpc_id) != constants.rpc_id_length: if len(rpc_id) != constants.RPC_ID_LENGTH:
raise ValueError(f"invalid rpc node_id: {len(rpc_id)} bytes (expected 20)") raise ValueError(f"invalid rpc node_id: {len(rpc_id)} bytes (expected 20)")
if not len(node_id) == constants.hash_length: if not len(node_id) == constants.HASH_LENGTH:
raise ValueError(f"invalid node node_id: {len(node_id)} bytes (expected 48)") raise ValueError(f"invalid node node_id: {len(node_id)} bytes (expected 48)")
self.rpc_id = rpc_id self.rpc_id = rpc_id
self.node_id = node_id self.node_id = node_id
@ -77,18 +77,18 @@ class RequestDatagram(KademliaDatagramBase):
@classmethod @classmethod
def make_ping(cls, from_node_id: bytes, rpc_id: typing.Optional[bytes] = None) -> 'RequestDatagram': def make_ping(cls, from_node_id: bytes, rpc_id: typing.Optional[bytes] = None) -> 'RequestDatagram':
rpc_id = rpc_id or constants.generate_id()[:constants.rpc_id_length] rpc_id = rpc_id or constants.generate_id()[:constants.RPC_ID_LENGTH]
return cls(REQUEST_TYPE, rpc_id, from_node_id, b'ping') return cls(REQUEST_TYPE, rpc_id, from_node_id, b'ping')
@classmethod @classmethod
def make_store(cls, from_node_id: bytes, blob_hash: bytes, token: bytes, port: int, def make_store(cls, from_node_id: bytes, blob_hash: bytes, token: bytes, port: int,
rpc_id: typing.Optional[bytes] = None) -> 'RequestDatagram': rpc_id: typing.Optional[bytes] = None) -> 'RequestDatagram':
rpc_id = rpc_id or constants.generate_id()[:constants.rpc_id_length] rpc_id = rpc_id or constants.generate_id()[:constants.RPC_ID_LENGTH]
if len(blob_hash) != constants.hash_bits // 8: if len(blob_hash) != constants.HASH_BITS // 8:
raise ValueError(f"invalid blob hash length: {len(blob_hash)}") raise ValueError(f"invalid blob hash length: {len(blob_hash)}")
if not 0 < port < 65536: if not 0 < port < 65536:
raise ValueError(f"invalid port: {port}") raise ValueError(f"invalid port: {port}")
if len(token) != constants.hash_bits // 8: if len(token) != constants.HASH_BITS // 8:
raise ValueError(f"invalid token length: {len(token)}") raise ValueError(f"invalid token length: {len(token)}")
store_args = [blob_hash, token, port, from_node_id, 0] store_args = [blob_hash, token, port, from_node_id, 0]
return cls(REQUEST_TYPE, rpc_id, from_node_id, b'store', store_args) return cls(REQUEST_TYPE, rpc_id, from_node_id, b'store', store_args)
@ -96,16 +96,16 @@ class RequestDatagram(KademliaDatagramBase):
@classmethod @classmethod
def make_find_node(cls, from_node_id: bytes, key: bytes, def make_find_node(cls, from_node_id: bytes, key: bytes,
rpc_id: typing.Optional[bytes] = None) -> 'RequestDatagram': rpc_id: typing.Optional[bytes] = None) -> 'RequestDatagram':
rpc_id = rpc_id or constants.generate_id()[:constants.rpc_id_length] rpc_id = rpc_id or constants.generate_id()[:constants.RPC_ID_LENGTH]
if len(key) != constants.hash_bits // 8: if len(key) != constants.HASH_BITS // 8:
raise ValueError(f"invalid key length: {len(key)}") raise ValueError(f"invalid key length: {len(key)}")
return cls(REQUEST_TYPE, rpc_id, from_node_id, b'findNode', [key]) return cls(REQUEST_TYPE, rpc_id, from_node_id, b'findNode', [key])
@classmethod @classmethod
def make_find_value(cls, from_node_id: bytes, key: bytes, def make_find_value(cls, from_node_id: bytes, key: bytes,
rpc_id: typing.Optional[bytes] = None, page: int = 0) -> 'RequestDatagram': rpc_id: typing.Optional[bytes] = None, page: int = 0) -> 'RequestDatagram':
rpc_id = rpc_id or constants.generate_id()[:constants.rpc_id_length] rpc_id = rpc_id or constants.generate_id()[:constants.RPC_ID_LENGTH]
if len(key) != constants.hash_bits // 8: if len(key) != constants.HASH_BITS // 8:
raise ValueError(f"invalid key length: {len(key)}") raise ValueError(f"invalid key length: {len(key)}")
if page < 0: if page < 0:
raise ValueError(f"cannot request a negative page ({page})") raise ValueError(f"cannot request a negative page ({page})")
@ -179,7 +179,7 @@ def make_compact_address(node_id: bytes, address: str, port: int) -> bytearray:
compact_ip = make_compact_ip(address) compact_ip = make_compact_ip(address)
if not 0 < port < 65536: if not 0 < port < 65536:
raise ValueError(f'Invalid port: {port}') raise ValueError(f'Invalid port: {port}')
if len(node_id) != constants.hash_bits // 8: if len(node_id) != constants.HASH_BITS // 8:
raise ValueError(f"invalid node node_id length") raise ValueError(f"invalid node node_id length")
return compact_ip + port.to_bytes(2, 'big') + node_id return compact_ip + port.to_bytes(2, 'big') + node_id
@ -190,6 +190,6 @@ def decode_compact_address(compact_address: bytes) -> typing.Tuple[bytes, str, i
node_id = compact_address[6:] node_id = compact_address[6:]
if not 0 < port < 65536: if not 0 < port < 65536:
raise ValueError(f'Invalid port: {port}') raise ValueError(f'Invalid port: {port}')
if len(node_id) != constants.hash_bits // 8: if len(node_id) != constants.HASH_BITS // 8:
raise ValueError(f"invalid node node_id length") raise ValueError(f"invalid node node_id length")
return node_id, address, port return node_id, address, port

View file

@ -10,7 +10,7 @@ from lbry.conf import Config
from lbry.wallet.dewies import dewies_to_lbc, lbc_to_dewies from lbry.wallet.dewies import dewies_to_lbc, lbc_to_dewies
from lbry.wallet.transaction import Transaction from lbry.wallet.transaction import Transaction
from lbry.schema.claim import Claim from lbry.schema.claim import Claim
from lbry.dht.constants import data_expiration from lbry.dht.constants import DATA_EXPIRATION
from lbry.blob.blob_info import BlobInfo from lbry.blob.blob_info import BlobInfo
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
@ -386,7 +386,7 @@ class SQLiteStorage(SQLiteMixin):
return transaction.executemany( return transaction.executemany(
"update blob set next_announce_time=?, last_announced_time=?, single_announce=0 " "update blob set next_announce_time=?, last_announced_time=?, single_announce=0 "
"where blob_hash=?", "where blob_hash=?",
((int(last_announced + (data_expiration / 2)), int(last_announced), blob_hash) ((int(last_announced + (DATA_EXPIRATION / 2)), int(last_announced), blob_hash)
for blob_hash in blob_hashes) for blob_hash in blob_hashes)
).fetchall() ).fetchall()
return self.db.run(_update_last_announced_blobs) return self.db.run(_update_last_announced_blobs)

View file

@ -26,7 +26,7 @@ class TestKBucket(AsyncioTestCase):
self.loop = asyncio.get_event_loop() self.loop = asyncio.get_event_loop()
self.address_generator = address_generator() self.address_generator = address_generator()
self.peer_manager = PeerManager(self.loop) self.peer_manager = PeerManager(self.loop)
self.kbucket = KBucket(self.peer_manager, 0, 2**constants.hash_bits, generate_id()) self.kbucket = KBucket(self.peer_manager, 0, 2 ** constants.HASH_BITS, generate_id())
def test_add_peer(self): def test_add_peer(self):
peer = make_kademlia_peer(constants.generate_id(2), "1.2.3.4", udp_port=4444) peer = make_kademlia_peer(constants.generate_id(2), "1.2.3.4", udp_port=4444)
@ -58,7 +58,7 @@ class TestKBucket(AsyncioTestCase):
# Test if contacts can be added to empty list # Test if contacts can be added to empty list
# Add k contacts to bucket # Add k contacts to bucket
for i in range(constants.k): for i in range(constants.K):
peer = make_kademlia_peer(generate_id(), next(self.address_generator), 4444) peer = make_kademlia_peer(generate_id(), next(self.address_generator), 4444)
self.assertTrue(self.kbucket.add_peer(peer)) self.assertTrue(self.kbucket.add_peer(peer))
self.assertEqual(peer, self.kbucket.peers[i]) self.assertEqual(peer, self.kbucket.peers[i])
@ -130,7 +130,7 @@ class TestKBucket(AsyncioTestCase):
added = [] added = []
# Add couple contacts # Add couple contacts
for i in range(constants.k-2): for i in range(constants.K - 2):
peer = make_kademlia_peer(generate_id(), next(self.address_generator), 4444) peer = make_kademlia_peer(generate_id(), next(self.address_generator), 4444)
self.assertTrue(self.kbucket.add_peer(peer)) self.assertTrue(self.kbucket.add_peer(peer))
added.append(peer) added.append(peer)

View file

@ -147,13 +147,13 @@ class TestBlobAnnouncer(AsyncioTestCase):
self.assertNotIn(blob_hash, peers_for_blob) self.assertNotIn(blob_hash, peers_for_blob)
self.assertEqual(peers_for_blob[b'p'], 0) self.assertEqual(peers_for_blob[b'p'], 0)
else: else:
self.assertEqual(len(peers_for_blob[blob_hash]), min(i - 1, constants.k)) self.assertEqual(len(peers_for_blob[blob_hash]), min(i - 1, constants.K))
self.assertEqual(len(announced_to.protocol.data_store.get_peers_for_blob(blob_hash)), i) self.assertEqual(len(announced_to.protocol.data_store.get_peers_for_blob(blob_hash)), i)
if i - 1 > constants.k: if i - 1 > constants.K:
self.assertEqual(len(peers_for_blob[b'contacts']), constants.k) self.assertEqual(len(peers_for_blob[b'contacts']), constants.K)
self.assertEqual(peers_for_blob[b'p'], ((i - 1) // (constants.k + 1)) + 1) self.assertEqual(peers_for_blob[b'p'], ((i - 1) // (constants.K + 1)) + 1)
seen = set(peers_for_blob[blob_hash]) seen = set(peers_for_blob[blob_hash])
self.assertEqual(len(seen), constants.k) self.assertEqual(len(seen), constants.K)
self.assertEqual(len(peers_for_blob[blob_hash]), len(seen)) self.assertEqual(len(peers_for_blob[blob_hash]), len(seen))
for pg in range(1, peers_for_blob[b'p']): for pg in range(1, peers_for_blob[b'p']):