lint dht: datastore protocol
This commit is contained in:
parent
efb5f232f7
commit
20c46677d0
3 changed files with 41 additions and 36 deletions
|
@ -53,7 +53,7 @@ class DictDataStore:
|
||||||
now = self.loop.time()
|
now = self.loop.time()
|
||||||
if key in self._data_store:
|
if key in self._data_store:
|
||||||
current = list(filter(lambda x: x[0] == contact, self._data_store[key]))
|
current = list(filter(lambda x: x[0] == contact, self._data_store[key]))
|
||||||
if len(current):
|
if len(current) > 0:
|
||||||
self._data_store[key][self._data_store[key].index(current[0])] = contact, now
|
self._data_store[key][self._data_store[key].index(current[0])] = contact, now
|
||||||
else:
|
else:
|
||||||
self._data_store[key].append((contact, now))
|
self._data_store[key].append((contact, now))
|
||||||
|
@ -65,6 +65,6 @@ class DictDataStore:
|
||||||
|
|
||||||
def get_storing_contacts(self) -> typing.List['KademliaPeer']:
|
def get_storing_contacts(self) -> typing.List['KademliaPeer']:
|
||||||
peers = set()
|
peers = set()
|
||||||
for key, stored in self._data_store.items():
|
for _, stored in self._data_store.items():
|
||||||
peers.update(set(map(lambda tup: tup[0], stored)))
|
peers.update(set(map(lambda tup: tup[0], stored)))
|
||||||
return list(peers)
|
return list(peers)
|
||||||
|
|
|
@ -23,7 +23,7 @@ if typing.TYPE_CHECKING:
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
old_protocol_errors = {
|
OLD_PROTOCOL_ERRORS = {
|
||||||
"findNode() takes exactly 2 arguments (5 given)": "0.19.1",
|
"findNode() takes exactly 2 arguments (5 given)": "0.19.1",
|
||||||
"findValue() takes exactly 2 arguments (5 given)": "0.19.1"
|
"findValue() takes exactly 2 arguments (5 given)": "0.19.1"
|
||||||
}
|
}
|
||||||
|
@ -326,9 +326,9 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
if not peer.node_id:
|
if not peer.node_id:
|
||||||
log.warning("Tried adding a peer with no node id!")
|
log.warning("Tried adding a peer with no node id!")
|
||||||
return False
|
return False
|
||||||
for p in self.routing_table.get_peers():
|
for my_peer in self.routing_table.get_peers():
|
||||||
if (p.address, p.udp_port) == (peer.address, peer.udp_port) and p.node_id != peer.node_id:
|
if (my_peer.address, my_peer.udp_port) == (peer.address, peer.udp_port) and my_peer.node_id != peer.node_id:
|
||||||
self.routing_table.remove_peer(p)
|
self.routing_table.remove_peer(my_peer)
|
||||||
self.routing_table.join_buckets()
|
self.routing_table.join_buckets()
|
||||||
bucket_index = self.routing_table.kbucket_index(peer.node_id)
|
bucket_index = self.routing_table.kbucket_index(peer.node_id)
|
||||||
if self.routing_table.buckets[bucket_index].add_peer(peer):
|
if self.routing_table.buckets[bucket_index].add_peer(peer):
|
||||||
|
@ -367,10 +367,10 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
|
|
||||||
not_good_contacts = self.routing_table.buckets[bucket_index].get_bad_or_unknown_peers()
|
not_good_contacts = self.routing_table.buckets[bucket_index].get_bad_or_unknown_peers()
|
||||||
not_recently_replied = []
|
not_recently_replied = []
|
||||||
for p in not_good_contacts:
|
for my_peer in not_good_contacts:
|
||||||
last_replied = self.peer_manager.get_last_replied(p.address, p.udp_port)
|
last_replied = self.peer_manager.get_last_replied(my_peer.address, my_peer.udp_port)
|
||||||
if not last_replied or last_replied + 60 < self.loop.time():
|
if not last_replied or last_replied + 60 < self.loop.time():
|
||||||
not_recently_replied.append(p)
|
not_recently_replied.append(my_peer)
|
||||||
if not_recently_replied:
|
if not_recently_replied:
|
||||||
to_replace = not_recently_replied[0]
|
to_replace = not_recently_replied[0]
|
||||||
else:
|
else:
|
||||||
|
@ -421,20 +421,20 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
raise AttributeError('Invalid method: %s' % message.method.decode())
|
raise AttributeError('Invalid method: %s' % message.method.decode())
|
||||||
if message.args and isinstance(message.args[-1], dict) and b'protocolVersion' in message.args[-1]:
|
if message.args and isinstance(message.args[-1], dict) and b'protocolVersion' in message.args[-1]:
|
||||||
# args don't need reformatting
|
# args don't need reformatting
|
||||||
args, kw = tuple(message.args[:-1]), message.args[-1]
|
args, kwargs = tuple(message.args[:-1]), message.args[-1]
|
||||||
else:
|
else:
|
||||||
args, kw = self._migrate_incoming_rpc_args(sender_contact, message.method, *message.args)
|
args, kwargs = self._migrate_incoming_rpc_args(sender_contact, message.method, *message.args)
|
||||||
log.debug("%s:%i RECV CALL %s %s:%i", self.external_ip, self.udp_port, message.method.decode(),
|
log.debug("%s:%i RECV CALL %s %s:%i", self.external_ip, self.udp_port, message.method.decode(),
|
||||||
sender_contact.address, sender_contact.udp_port)
|
sender_contact.address, sender_contact.udp_port)
|
||||||
|
|
||||||
if method == b'ping':
|
if method == b'ping':
|
||||||
result = self.node_rpc.ping()
|
result = self.node_rpc.ping()
|
||||||
elif method == b'store':
|
elif method == b'store':
|
||||||
blob_hash, token, port, original_publisher_id, age = args[:5]
|
blob_hash, token, port, original_publisher_id, age = args[:5] # pylint: disable=unused-variable
|
||||||
result = self.node_rpc.store(sender_contact, blob_hash, token, port)
|
result = self.node_rpc.store(sender_contact, blob_hash, token, port)
|
||||||
else:
|
else:
|
||||||
key = args[0]
|
key = args[0]
|
||||||
page = kw.get(PAGE_KEY, 0)
|
page = kwargs.get(PAGE_KEY, 0)
|
||||||
if method == b'findNode':
|
if method == b'findNode':
|
||||||
result = self.node_rpc.find_node(sender_contact, key)
|
result = self.node_rpc.find_node(sender_contact, key)
|
||||||
else:
|
else:
|
||||||
|
@ -484,25 +484,25 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
def handle_response_datagram(self, address: typing.Tuple[str, int], response_datagram: ResponseDatagram):
|
def handle_response_datagram(self, address: typing.Tuple[str, int], response_datagram: ResponseDatagram):
|
||||||
# Find the message that triggered this response
|
# Find the message that triggered this response
|
||||||
if response_datagram.rpc_id in self.sent_messages:
|
if response_datagram.rpc_id in self.sent_messages:
|
||||||
peer, df, request = self.sent_messages[response_datagram.rpc_id]
|
peer, future, _ = self.sent_messages[response_datagram.rpc_id]
|
||||||
if peer.address != address[0]:
|
if peer.address != address[0]:
|
||||||
df.set_exception(RemoteException(
|
future.set_exception(
|
||||||
f"response from {address[0]}, expected {peer.address}")
|
RemoteException(f"response from {address[0]}, expected {peer.address}")
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
# We got a result from the RPC
|
# We got a result from the RPC
|
||||||
if peer.node_id == self.node_id:
|
if peer.node_id == self.node_id:
|
||||||
df.set_exception(RemoteException("node has our node id"))
|
future.set_exception(RemoteException("node has our node id"))
|
||||||
return
|
return
|
||||||
elif response_datagram.node_id == self.node_id:
|
elif response_datagram.node_id == self.node_id:
|
||||||
df.set_exception(RemoteException("incoming message is from our node id"))
|
future.set_exception(RemoteException("incoming message is from our node id"))
|
||||||
return
|
return
|
||||||
peer = make_kademlia_peer(response_datagram.node_id, address[0], address[1])
|
peer = make_kademlia_peer(response_datagram.node_id, address[0], address[1])
|
||||||
self.peer_manager.report_last_replied(address[0], address[1])
|
self.peer_manager.report_last_replied(address[0], address[1])
|
||||||
self.peer_manager.update_contact_triple(peer.node_id, address[0], address[1])
|
self.peer_manager.update_contact_triple(peer.node_id, address[0], address[1])
|
||||||
if not df.cancelled():
|
if not future.cancelled():
|
||||||
df.set_result(response_datagram)
|
future.set_result(response_datagram)
|
||||||
self.add_peer(peer)
|
self.add_peer(peer)
|
||||||
else:
|
else:
|
||||||
log.warning("%s:%i replied, but after we cancelled the request attempt",
|
log.warning("%s:%i replied, but after we cancelled the request attempt",
|
||||||
|
@ -516,11 +516,13 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
# The RPC request raised a remote exception; raise it locally
|
# The RPC request raised a remote exception; raise it locally
|
||||||
remote_exception = RemoteException(f"{error_datagram.exception_type}({error_datagram.response})")
|
remote_exception = RemoteException(f"{error_datagram.exception_type}({error_datagram.response})")
|
||||||
if error_datagram.rpc_id in self.sent_messages:
|
if error_datagram.rpc_id in self.sent_messages:
|
||||||
peer, df, request = self.sent_messages.pop(error_datagram.rpc_id)
|
peer, future, request = self.sent_messages.pop(error_datagram.rpc_id)
|
||||||
if (peer.address, peer.udp_port) != address:
|
if (peer.address, peer.udp_port) != address:
|
||||||
df.set_exception(RemoteException(
|
future.set_exception(
|
||||||
f"response from {address[0]}:{address[1]}, "
|
RemoteException(
|
||||||
f"expected {peer.address}:{peer.udp_port}")
|
f"response from {address[0]}:{address[1]}, "
|
||||||
|
f"expected {peer.address}:{peer.udp_port}"
|
||||||
|
)
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
error_msg = f"" \
|
error_msg = f"" \
|
||||||
|
@ -529,23 +531,27 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
f"Raised: {str(remote_exception)}"
|
f"Raised: {str(remote_exception)}"
|
||||||
if 'Invalid token' in error_msg:
|
if 'Invalid token' in error_msg:
|
||||||
log.debug(error_msg)
|
log.debug(error_msg)
|
||||||
elif error_datagram.response not in old_protocol_errors:
|
elif error_datagram.response not in OLD_PROTOCOL_ERRORS:
|
||||||
log.warning(error_msg)
|
log.warning(error_msg)
|
||||||
else:
|
else:
|
||||||
log.debug("known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)",
|
log.debug(
|
||||||
peer.address, peer.udp_port, old_protocol_errors[error_datagram.response])
|
"known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)",
|
||||||
df.set_exception(remote_exception)
|
peer.address, peer.udp_port, OLD_PROTOCOL_ERRORS[error_datagram.response]
|
||||||
|
)
|
||||||
|
future.set_exception(remote_exception)
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
if error_datagram.response not in old_protocol_errors:
|
if error_datagram.response not in OLD_PROTOCOL_ERRORS:
|
||||||
msg = f"Received error from {address[0]}:{address[1]}, but it isn't in response to a " \
|
msg = f"Received error from {address[0]}:{address[1]}, but it isn't in response to a " \
|
||||||
f"pending request: {str(remote_exception)}"
|
f"pending request: {str(remote_exception)}"
|
||||||
log.warning(msg)
|
log.warning(msg)
|
||||||
else:
|
else:
|
||||||
log.debug("known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)",
|
log.debug(
|
||||||
address[0], address[1], old_protocol_errors[error_datagram.response])
|
"known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)",
|
||||||
|
address[0], address[1], OLD_PROTOCOL_ERRORS[error_datagram.response]
|
||||||
|
)
|
||||||
|
|
||||||
def datagram_received(self, datagram: bytes, address: typing.Tuple[str, int]) -> None:
|
def datagram_received(self, datagram: bytes, address: typing.Tuple[str, int]) -> None: # pylint: disable=arguments-differ
|
||||||
try:
|
try:
|
||||||
message = decode_datagram(datagram)
|
message = decode_datagram(datagram)
|
||||||
except (ValueError, TypeError):
|
except (ValueError, TypeError):
|
||||||
|
@ -646,7 +652,7 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
async def store_to_peer(self, hash_value: bytes, peer: 'KademliaPeer',
|
async def store_to_peer(self, hash_value: bytes, peer: 'KademliaPeer', # pylint: disable=too-many-return-statements
|
||||||
retry: bool = True) -> typing.Tuple[bytes, bool]:
|
retry: bool = True) -> typing.Tuple[bytes, bool]:
|
||||||
async def __store():
|
async def __store():
|
||||||
res = await self.get_rpc_peer(peer).store(hash_value)
|
res = await self.get_rpc_peer(peer).store(hash_value)
|
||||||
|
@ -661,7 +667,7 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
log.debug("Timeout while storing blob_hash %s at %s", binascii.hexlify(hash_value).decode()[:8], peer)
|
log.debug("Timeout while storing blob_hash %s at %s", binascii.hexlify(hash_value).decode()[:8], peer)
|
||||||
return peer.node_id, False
|
return peer.node_id, False
|
||||||
except ValueError as err:
|
except ValueError as err:
|
||||||
log.error("Unexpected response: %s" % err)
|
log.error("Unexpected response: %s", err)
|
||||||
return peer.node_id, False
|
return peer.node_id, False
|
||||||
except RemoteException as err:
|
except RemoteException as err:
|
||||||
if 'findValue() takes exactly 2 arguments (5 given)' in str(err):
|
if 'findValue() takes exactly 2 arguments (5 given)' in str(err):
|
||||||
|
|
|
@ -52,8 +52,7 @@ def _bdecode(data: bytes, start_index: int = 0) -> typing.Tuple[typing.Union[int
|
||||||
raise DecodeError(err)
|
raise DecodeError(err)
|
||||||
start_index = split_pos + 1
|
start_index = split_pos + 1
|
||||||
end_pos = start_index + length
|
end_pos = start_index + length
|
||||||
b = data[start_index:end_pos]
|
return data[start_index:end_pos], end_pos
|
||||||
return b, end_pos
|
|
||||||
|
|
||||||
|
|
||||||
def bencode(data: typing.Dict) -> bytes:
|
def bencode(data: typing.Dict) -> bytes:
|
||||||
|
|
Loading…
Reference in a new issue