Merge pull request #2489 from lbryio/dht_object_cache

DHT object cache revamp
This commit is contained in:
Jack Robison 2019-10-07 12:27:07 -04:00 committed by GitHub
commit 8c0196301d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 110 additions and 102 deletions

View file

@ -4,13 +4,14 @@ import typing
import binascii import binascii
from lbry.utils import resolve_host from lbry.utils import resolve_host
from lbry.dht import constants from lbry.dht import constants
from lbry.dht.peer import make_kademlia_peer
from lbry.dht.protocol.distance import Distance from lbry.dht.protocol.distance import Distance
from lbry.dht.protocol.iterative_find import IterativeNodeFinder, IterativeValueFinder from lbry.dht.protocol.iterative_find import IterativeNodeFinder, IterativeValueFinder
from lbry.dht.protocol.protocol import KademliaProtocol from lbry.dht.protocol.protocol import KademliaProtocol
from lbry.dht.peer import KademliaPeer
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.dht.peer import PeerManager from lbry.dht.peer import PeerManager
from lbry.dht.peer import KademliaPeer
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -141,7 +142,7 @@ class Node:
if known_node_addresses: if known_node_addresses:
peers = [ peers = [
KademliaPeer(self.loop, address, udp_port=port) make_kademlia_peer(None, address, port)
for (address, port) in known_node_addresses for (address, port) in known_node_addresses
] ]
while True: while True:
@ -232,7 +233,7 @@ class Node:
if not peer.udp_port: if not peer.udp_port:
udp_port_to_try = peer.tcp_port udp_port_to_try = peer.tcp_port
if not peer.udp_port: if not peer.udp_port:
peer.update_udp_port(udp_port_to_try) peer = make_kademlia_peer(peer.node_id, peer.address, udp_port_to_try, peer.tcp_port)
self.loop.create_task(ping(peer)) self.loop.create_task(ping(peer))
else: else:
log.debug("skip bad peer %s:%i for %s", peer.address, peer.tcp_port, blob_hash) log.debug("skip bad peer %s:%i for %s", peer.address, peer.tcp_port, blob_hash)

View file

@ -3,6 +3,7 @@ import asyncio
import logging import logging
import ipaddress import ipaddress
from binascii import hexlify from binascii import hexlify
from dataclasses import dataclass, field
from functools import lru_cache from functools import lru_cache
from lbry.dht import constants from lbry.dht import constants
@ -11,6 +12,13 @@ from lbry.dht.serialization.datagram import make_compact_address, make_compact_i
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@lru_cache(1024)
def make_kademlia_peer(node_id: typing.Optional[bytes], address: typing.Optional[str],
udp_port: typing.Optional[int] = None,
tcp_port: typing.Optional[int] = None) -> 'KademliaPeer':
return KademliaPeer(address, node_id, udp_port, tcp_port=tcp_port)
def is_valid_ipv4(address): def is_valid_ipv4(address):
try: try:
ip = ipaddress.ip_address(address) ip = ipaddress.ip_address(address)
@ -81,10 +89,6 @@ class PeerManager:
self._node_id_mapping[(address, udp_port)] = node_id self._node_id_mapping[(address, udp_port)] = node_id
self._node_id_reverse_mapping[node_id] = (address, udp_port) self._node_id_reverse_mapping[node_id] = (address, udp_port)
@lru_cache(maxsize=400)
def get_kademlia_peer(self, node_id: bytes, address: str, udp_port: int) -> 'KademliaPeer':
return KademliaPeer(self._loop, address, node_id, udp_port)
def prune(self): # TODO: periodically call this def prune(self): # TODO: periodically call this
now = self._loop.time() now = self._loop.time()
to_pop = [] to_pop = []
@ -116,6 +120,8 @@ class PeerManager:
previous_failure, most_recent_failure = self._rpc_failures.get((address, udp_port), (None, None)) previous_failure, most_recent_failure = self._rpc_failures.get((address, udp_port), (None, None))
last_requested = self._last_requested.get((address, udp_port)) last_requested = self._last_requested.get((address, udp_port))
last_replied = self._last_replied.get((address, udp_port)) last_replied = self._last_replied.get((address, udp_port))
if node_id is None:
return None
if most_recent_failure and last_replied: if most_recent_failure and last_replied:
if delay < last_replied > most_recent_failure: if delay < last_replied > most_recent_failure:
return True return True
@ -135,47 +141,31 @@ class PeerManager:
def decode_tcp_peer_from_compact_address(self, compact_address: bytes) -> 'KademliaPeer': def decode_tcp_peer_from_compact_address(self, compact_address: bytes) -> 'KademliaPeer':
node_id, address, tcp_port = decode_compact_address(compact_address) node_id, address, tcp_port = decode_compact_address(compact_address)
return KademliaPeer(self._loop, address, node_id, tcp_port=tcp_port) return make_kademlia_peer(node_id, address, udp_port=None, tcp_port=tcp_port)
@dataclass(unsafe_hash=True)
class KademliaPeer: class KademliaPeer:
__slots__ = [ address: str = field(hash=True)
'loop', _node_id: typing.Optional[bytes] = field(hash=True)
'_node_id', udp_port: typing.Optional[int] = field(hash=True)
'address', tcp_port: typing.Optional[int] = field(compare=False, hash=False)
'udp_port', protocol_version: typing.Optional[int] = field(default=1, compare=False, hash=False)
'tcp_port',
'protocol_version',
]
def __init__(self, loop: asyncio.AbstractEventLoop, address: str, node_id: typing.Optional[bytes] = None, def __post_init__(self):
udp_port: typing.Optional[int] = None, tcp_port: typing.Optional[int] = None): if self._node_id is not None:
if node_id is not None: if not len(self._node_id) == constants.hash_length:
if not len(node_id) == constants.hash_length: raise ValueError("invalid node_id: {}".format(hexlify(self._node_id).decode()))
raise ValueError("invalid node_id: {}".format(hexlify(node_id).decode())) if self.udp_port is not None and not 1 <= self.udp_port <= 65535:
if udp_port is not None and not 0 <= udp_port <= 65536:
raise ValueError("invalid udp port") raise ValueError("invalid udp port")
if tcp_port and not 0 <= tcp_port <= 65536: if self.tcp_port is not None and not 1 <= self.tcp_port <= 65535:
raise ValueError("invalid tcp port") raise ValueError("invalid tcp port")
if not is_valid_ipv4(address): if not is_valid_ipv4(self.address):
raise ValueError("invalid ip address") raise ValueError("invalid ip address")
self.loop = loop
self._node_id = node_id
self.address = address
self.udp_port = udp_port
self.tcp_port = tcp_port
self.protocol_version = 1
def update_tcp_port(self, tcp_port: int): def update_tcp_port(self, tcp_port: int):
self.tcp_port = tcp_port self.tcp_port = tcp_port
def update_udp_port(self, udp_port: int):
self.udp_port = udp_port
def set_id(self, node_id):
if not self._node_id:
self._node_id = node_id
@property @property
def node_id(self) -> bytes: def node_id(self) -> bytes:
return self._node_id return self._node_id
@ -188,11 +178,3 @@ class KademliaPeer:
def compact_ip(self): def compact_ip(self):
return make_compact_ip(self.address) return make_compact_ip(self.address)
def __eq__(self, other):
if not isinstance(other, KademliaPeer):
raise TypeError("invalid type to compare with Peer: %s" % str(type(other)))
return (self.node_id, self.address, self.udp_port) == (other.node_id, other.address, other.udp_port)
def __hash__(self):
return hash((self.node_id, self.address, self.udp_port))

View file

@ -7,6 +7,7 @@ import logging
from lbry.dht import constants from lbry.dht import constants
from lbry.dht.error import RemoteException, TransportNotConnected from lbry.dht.error import RemoteException, TransportNotConnected
from lbry.dht.protocol.distance import Distance from lbry.dht.protocol.distance import Distance
from lbry.dht.peer import make_kademlia_peer
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
if TYPE_CHECKING: if TYPE_CHECKING:
@ -150,7 +151,7 @@ class IterativeFinder:
self._add_active(peer) self._add_active(peer)
for contact_triple in response.get_close_triples(): for contact_triple in response.get_close_triples():
node_id, address, udp_port = contact_triple node_id, address, udp_port = contact_triple
self._add_active(self.peer_manager.get_kademlia_peer(node_id, address, udp_port)) self._add_active(make_kademlia_peer(node_id, address, udp_port))
self.check_result_ready(response) self.check_result_ready(response)
async def _send_probe(self, peer: 'KademliaPeer'): async def _send_probe(self, peer: 'KademliaPeer'):

View file

@ -15,6 +15,7 @@ from lbry.dht.serialization.datagram import RESPONSE_TYPE, ERROR_TYPE, PAGE_KEY
from lbry.dht.error import RemoteException, TransportNotConnected from lbry.dht.error import RemoteException, TransportNotConnected
from lbry.dht.protocol.routing_table import TreeRoutingTable from lbry.dht.protocol.routing_table import TreeRoutingTable
from lbry.dht.protocol.data_store import DictDataStore from lbry.dht.protocol.data_store import DictDataStore
from lbry.dht.peer import make_kademlia_peer
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.dht.peer import PeerManager, KademliaPeer from lbry.dht.peer import PeerManager, KademliaPeer
@ -322,6 +323,9 @@ class KademliaProtocol(DatagramProtocol):
return args, {} return args, {}
async def _add_peer(self, peer: 'KademliaPeer'): async def _add_peer(self, peer: 'KademliaPeer'):
if not peer.node_id:
log.warning("Tried adding a peer with no node id!")
return False
for p in self.routing_table.get_peers(): for p in self.routing_table.get_peers():
if (p.address, p.udp_port) == (peer.address, peer.udp_port) and p.node_id != peer.node_id: if (p.address, p.udp_port) == (peer.address, peer.udp_port) and p.node_id != peer.node_id:
self.routing_table.remove_peer(p) self.routing_table.remove_peer(p)
@ -447,7 +451,7 @@ class KademliaProtocol(DatagramProtocol):
try: try:
peer = self.routing_table.get_peer(request_datagram.node_id) peer = self.routing_table.get_peer(request_datagram.node_id)
except IndexError: except IndexError:
peer = self.peer_manager.get_kademlia_peer(request_datagram.node_id, address[0], address[1]) peer = make_kademlia_peer(request_datagram.node_id, address[0], address[1])
try: try:
self._handle_rpc(peer, request_datagram) self._handle_rpc(peer, request_datagram)
# if the contact is not known to be bad (yet) and we haven't yet queried it, send it a ping so that it # if the contact is not known to be bad (yet) and we haven't yet queried it, send it a ping so that it
@ -494,8 +498,7 @@ class KademliaProtocol(DatagramProtocol):
elif response_datagram.node_id == self.node_id: elif response_datagram.node_id == self.node_id:
df.set_exception(RemoteException("incoming message is from our node id")) df.set_exception(RemoteException("incoming message is from our node id"))
return return
peer.set_id(response_datagram.node_id) peer = make_kademlia_peer(response_datagram.node_id, address[0], address[1])
peer.update_udp_port(address[1])
self.peer_manager.report_last_replied(address[0], address[1]) self.peer_manager.report_last_replied(address[0], address[1])
self.peer_manager.update_contact_triple(peer.node_id, address[0], address[1]) self.peer_manager.update_contact_triple(peer.node_id, address[0], address[1])
if not df.cancelled(): if not df.cancelled():

View file

@ -23,6 +23,7 @@ from lbry import utils
from lbry.conf import Config, Setting from lbry.conf import Config, Setting
from lbry.blob.blob_file import is_valid_blobhash, BlobBuffer from lbry.blob.blob_file import is_valid_blobhash, BlobBuffer
from lbry.blob_exchange.downloader import download_blob from lbry.blob_exchange.downloader import download_blob
from lbry.dht.peer import make_kademlia_peer
from lbry.error import DownloadSDTimeout, ComponentsNotStarted from lbry.error import DownloadSDTimeout, ComponentsNotStarted
from lbry.error import NullFundsError, NegativeFundsError, ComponentStartConditionNotMet from lbry.error import NullFundsError, NegativeFundsError, ComponentStartConditionNotMet
from lbry.extras import system_info from lbry.extras import system_info
@ -3655,8 +3656,7 @@ class Daemon(metaclass=JSONRPCServerType):
""" """
peer = None peer = None
if node_id and address and port: if node_id and address and port:
peer = self.component_manager.peer_manager.get_kademlia_peer(unhexlify(node_id), address, peer = make_kademlia_peer(unhexlify(node_id), address, udp_port=int(port))
udp_port=int(port))
try: try:
return await self.dht_node.protocol.get_rpc_peer(peer).ping() return await self.dht_node.protocol.get_rpc_peer(peer).ping()
except asyncio.TimeoutError: except asyncio.TimeoutError:

View file

@ -2,11 +2,12 @@ import asyncio
import typing import typing
import logging import logging
import binascii import binascii
from lbry.dht.peer import make_kademlia_peer
from lbry.error import DownloadSDTimeout from lbry.error import DownloadSDTimeout
from lbry.utils import resolve_host, lru_cache_concurrent from lbry.utils import resolve_host, lru_cache_concurrent
from lbry.stream.descriptor import StreamDescriptor from lbry.stream.descriptor import StreamDescriptor
from lbry.blob_exchange.downloader import BlobDownloader from lbry.blob_exchange.downloader import BlobDownloader
from lbry.dht.peer import KademliaPeer
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.conf import Config from lbry.conf import Config
from lbry.dht.node import Node from lbry.dht.node import Node
@ -50,7 +51,7 @@ class StreamDownloader:
def _delayed_add_fixed_peers(): def _delayed_add_fixed_peers():
self.added_fixed_peers = True self.added_fixed_peers = True
self.peer_queue.put_nowait([ self.peer_queue.put_nowait([
KademliaPeer(self.loop, address=address, tcp_port=port + 1) make_kademlia_peer(None, address, None, tcp_port=port + 1)
for address, port in addresses for address, port in addresses
]) ])

View file

@ -3,7 +3,7 @@ from binascii import hexlify
from lbry.dht import constants from lbry.dht import constants
from lbry.dht.node import Node from lbry.dht.node import Node
from lbry.dht.peer import PeerManager, KademliaPeer from lbry.dht.peer import PeerManager, make_kademlia_peer
from torba.testcase import AsyncioTestCase from torba.testcase import AsyncioTestCase
@ -39,7 +39,7 @@ class DHTIntegrationTest(AsyncioTestCase):
bad_peers = [] bad_peers = []
for candidate in self.nodes[1:10]: for candidate in self.nodes[1:10]:
address, port, node_id = candidate.protocol.external_ip, candidate.protocol.udp_port, candidate.protocol.node_id address, port, node_id = candidate.protocol.external_ip, candidate.protocol.udp_port, candidate.protocol.node_id
peer = KademliaPeer(self.loop, address, node_id, port) peer = make_kademlia_peer(node_id, address, udp_port=port)
bad_peers.append(peer) bad_peers.append(peer)
node.protocol.add_peer(peer) node.protocol.add_peer(peer)
candidate.stop() candidate.stop()
@ -102,7 +102,7 @@ class DHTIntegrationTest(AsyncioTestCase):
node2.stop() node2.stop()
# forcefully make it a bad peer but don't remove it from routing table # forcefully make it a bad peer but don't remove it from routing table
address, port, node_id = node2.protocol.external_ip, node2.protocol.udp_port, node2.protocol.node_id address, port, node_id = node2.protocol.external_ip, node2.protocol.udp_port, node2.protocol.node_id
peer = KademliaPeer(self.loop, address, node_id, port) peer = make_kademlia_peer(node_id, address, udp_port=port)
self.assertTrue(node1.protocol.peer_manager.peer_is_good(peer)) self.assertTrue(node1.protocol.peer_manager.peer_is_good(peer))
node1.protocol.peer_manager.report_failure(node2.protocol.external_ip, node2.protocol.udp_port) node1.protocol.peer_manager.report_failure(node2.protocol.external_ip, node2.protocol.udp_port)
node1.protocol.peer_manager.report_failure(node2.protocol.external_ip, node2.protocol.udp_port) node1.protocol.peer_manager.report_failure(node2.protocol.external_ip, node2.protocol.udp_port)

View file

@ -12,7 +12,8 @@ from lbry.extras.daemon.storage import SQLiteStorage
from lbry.blob.blob_manager import BlobManager from lbry.blob.blob_manager import BlobManager
from lbry.blob_exchange.server import BlobServer, BlobServerProtocol from lbry.blob_exchange.server import BlobServer, BlobServerProtocol
from lbry.blob_exchange.client import request_blob from lbry.blob_exchange.client import request_blob
from lbry.dht.peer import KademliaPeer, PeerManager from lbry.dht.peer import PeerManager, make_kademlia_peer
# import logging # import logging
# logging.getLogger("lbry").setLevel(logging.DEBUG) # logging.getLogger("lbry").setLevel(logging.DEBUG)
@ -43,7 +44,7 @@ class BlobExchangeTestBase(AsyncioTestCase):
self.client_storage = SQLiteStorage(self.client_config, os.path.join(self.client_dir, "lbrynet.sqlite")) self.client_storage = SQLiteStorage(self.client_config, os.path.join(self.client_dir, "lbrynet.sqlite"))
self.client_blob_manager = BlobManager(self.loop, self.client_dir, self.client_storage, self.client_config) self.client_blob_manager = BlobManager(self.loop, self.client_dir, self.client_storage, self.client_config)
self.client_peer_manager = PeerManager(self.loop) self.client_peer_manager = PeerManager(self.loop)
self.server_from_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333) self.server_from_client = make_kademlia_peer(b'1' * 48, "127.0.0.1", tcp_port=33333)
await self.client_storage.open() await self.client_storage.open()
await self.server_storage.open() await self.server_storage.open()
@ -102,7 +103,7 @@ class TestBlobExchange(BlobExchangeTestBase):
second_client_blob_manager = BlobManager( second_client_blob_manager = BlobManager(
self.loop, second_client_dir, second_client_storage, second_client_conf self.loop, second_client_dir, second_client_storage, second_client_conf
) )
server_from_second_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333) server_from_second_client = make_kademlia_peer(b'1' * 48, "127.0.0.1", tcp_port=33333)
await second_client_storage.open() await second_client_storage.open()
await second_client_blob_manager.setup() await second_client_blob_manager.setup()
@ -193,7 +194,7 @@ class TestBlobExchange(BlobExchangeTestBase):
second_client_blob_manager = BlobManager( second_client_blob_manager = BlobManager(
self.loop, second_client_dir, second_client_storage, second_client_conf self.loop, second_client_dir, second_client_storage, second_client_conf
) )
server_from_second_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333) server_from_second_client = make_kademlia_peer(b'1' * 48, "127.0.0.1", tcp_port=33333)
await second_client_storage.open() await second_client_storage.open()
await second_client_blob_manager.setup() await second_client_blob_manager.setup()

View file

@ -1,7 +1,7 @@
import asyncio import asyncio
from unittest import mock, TestCase from unittest import mock, TestCase
from lbry.dht.protocol.data_store import DictDataStore from lbry.dht.protocol.data_store import DictDataStore
from lbry.dht.peer import PeerManager from lbry.dht.peer import PeerManager, make_kademlia_peer
class DataStoreTests(TestCase): class DataStoreTests(TestCase):
@ -13,7 +13,7 @@ class DataStoreTests(TestCase):
def _test_add_peer_to_blob(self, blob=b'2' * 48, node_id=b'1' * 48, address='1.2.3.4', tcp_port=3333, def _test_add_peer_to_blob(self, blob=b'2' * 48, node_id=b'1' * 48, address='1.2.3.4', tcp_port=3333,
udp_port=4444): udp_port=4444):
peer = self.peer_manager.get_kademlia_peer(node_id, address, udp_port) peer = make_kademlia_peer(node_id, address, udp_port)
peer.update_tcp_port(tcp_port) peer.update_tcp_port(tcp_port)
before = self.data_store.get_peers_for_blob(blob) before = self.data_store.get_peers_for_blob(blob)
self.data_store.add_peer_to_blob(peer, blob) self.data_store.add_peer_to_blob(peer, blob)

View file

@ -2,7 +2,7 @@ import struct
import asyncio import asyncio
from lbry.utils import generate_id from lbry.utils import generate_id
from lbry.dht.protocol.routing_table import KBucket from lbry.dht.protocol.routing_table import KBucket
from lbry.dht.peer import PeerManager, KademliaPeer from lbry.dht.peer import PeerManager, make_kademlia_peer
from lbry.dht import constants from lbry.dht import constants
from torba.testcase import AsyncioTestCase from torba.testcase import AsyncioTestCase
@ -29,8 +29,8 @@ class TestKBucket(AsyncioTestCase):
self.kbucket = KBucket(self.peer_manager, 0, 2**constants.hash_bits, generate_id()) self.kbucket = KBucket(self.peer_manager, 0, 2**constants.hash_bits, generate_id())
def test_add_peer(self): def test_add_peer(self):
peer = KademliaPeer(None, '1.2.3.4', constants.generate_id(2), udp_port=4444) peer = make_kademlia_peer(constants.generate_id(2), "1.2.3.4", udp_port=4444)
peer_update2 = KademliaPeer(None, '1.2.3.4', constants.generate_id(2), udp_port=4445) peer_update2 = make_kademlia_peer(constants.generate_id(2), "1.2.3.4", udp_port=4445)
self.assertListEqual([], self.kbucket.peers) self.assertListEqual([], self.kbucket.peers)
@ -59,12 +59,12 @@ class TestKBucket(AsyncioTestCase):
# Test if contacts can be added to empty list # Test if contacts can be added to empty list
# Add k contacts to bucket # Add k contacts to bucket
for i in range(constants.k): for i in range(constants.k):
peer = self.peer_manager.get_kademlia_peer(generate_id(), next(self.address_generator), 4444) peer = make_kademlia_peer(generate_id(), next(self.address_generator), 4444)
self.assertTrue(self.kbucket.add_peer(peer)) self.assertTrue(self.kbucket.add_peer(peer))
self.assertEqual(peer, self.kbucket.peers[i]) self.assertEqual(peer, self.kbucket.peers[i])
# Test if contact is not added to full list # Test if contact is not added to full list
peer = self.peer_manager.get_kademlia_peer(generate_id(), next(self.address_generator), 4444) peer = make_kademlia_peer(generate_id(), next(self.address_generator), 4444)
self.assertFalse(self.kbucket.add_peer(peer)) self.assertFalse(self.kbucket.add_peer(peer))
# Test if an existing contact is updated correctly if added again # Test if an existing contact is updated correctly if added again
@ -125,13 +125,13 @@ class TestKBucket(AsyncioTestCase):
def test_remove_peer(self): def test_remove_peer(self):
# try remove contact from empty list # try remove contact from empty list
peer = self.peer_manager.get_kademlia_peer(generate_id(), next(self.address_generator), 4444) peer = make_kademlia_peer(generate_id(), next(self.address_generator), 4444)
self.assertRaises(ValueError, self.kbucket.remove_peer, peer) self.assertRaises(ValueError, self.kbucket.remove_peer, peer)
added = [] added = []
# Add couple contacts # Add couple contacts
for i in range(constants.k-2): for i in range(constants.k-2):
peer = self.peer_manager.get_kademlia_peer(generate_id(), next(self.address_generator), 4444) peer = make_kademlia_peer(generate_id(), next(self.address_generator), 4444)
self.assertTrue(self.kbucket.add_peer(peer)) self.assertTrue(self.kbucket.add_peer(peer))
added.append(peer) added.append(peer)

View file

@ -5,7 +5,7 @@ from tests import dht_mocks
from lbry.dht.serialization.bencoding import bencode, bdecode from lbry.dht.serialization.bencoding import bencode, bdecode
from lbry.dht import constants from lbry.dht import constants
from lbry.dht.protocol.protocol import KademliaProtocol from lbry.dht.protocol.protocol import KademliaProtocol
from lbry.dht.peer import PeerManager from lbry.dht.peer import PeerManager, make_kademlia_peer
class TestProtocol(AsyncioTestCase): class TestProtocol(AsyncioTestCase):
@ -22,7 +22,7 @@ class TestProtocol(AsyncioTestCase):
await loop.create_datagram_endpoint(lambda: peer1, ('1.2.3.4', 4444)) await loop.create_datagram_endpoint(lambda: peer1, ('1.2.3.4', 4444))
await loop.create_datagram_endpoint(lambda: peer2, ('1.2.3.5', 4444)) await loop.create_datagram_endpoint(lambda: peer2, ('1.2.3.5', 4444))
peer = peer2.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', udp_port=4444) peer = make_kademlia_peer(node_id1, '1.2.3.4', udp_port=4444)
result = await peer2.get_rpc_peer(peer).ping() result = await peer2.get_rpc_peer(peer).ping()
self.assertEqual(result, b'pong') self.assertEqual(result, b'pong')
peer1.stop() peer1.stop()
@ -43,7 +43,7 @@ class TestProtocol(AsyncioTestCase):
await loop.create_datagram_endpoint(lambda: peer1, ('1.2.3.4', 4444)) await loop.create_datagram_endpoint(lambda: peer1, ('1.2.3.4', 4444))
await loop.create_datagram_endpoint(lambda: peer2, ('1.2.3.5', 4444)) await loop.create_datagram_endpoint(lambda: peer2, ('1.2.3.5', 4444))
peer = peer2.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', udp_port=4444) peer = make_kademlia_peer(node_id1, '1.2.3.4', udp_port=4444)
self.assertEqual(None, peer2.peer_manager.get_node_token(peer.node_id)) self.assertEqual(None, peer2.peer_manager.get_node_token(peer.node_id))
await peer2.get_rpc_peer(peer).find_value(b'1' * 48) await peer2.get_rpc_peer(peer).find_value(b'1' * 48)
self.assertNotEqual(None, peer2.peer_manager.get_node_token(peer.node_id)) self.assertNotEqual(None, peer2.peer_manager.get_node_token(peer.node_id))
@ -65,12 +65,12 @@ class TestProtocol(AsyncioTestCase):
await loop.create_datagram_endpoint(lambda: peer1, ('1.2.3.4', 4444)) await loop.create_datagram_endpoint(lambda: peer1, ('1.2.3.4', 4444))
await loop.create_datagram_endpoint(lambda: peer2, ('1.2.3.5', 4444)) await loop.create_datagram_endpoint(lambda: peer2, ('1.2.3.5', 4444))
peer = peer2.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', udp_port=4444) peer = make_kademlia_peer(node_id1, '1.2.3.4', udp_port=4444)
peer2_from_peer1 = peer1.peer_manager.get_kademlia_peer( peer2_from_peer1 = make_kademlia_peer(
peer2.node_id, peer2.external_ip, udp_port=peer2.udp_port peer2.node_id, peer2.external_ip, udp_port=peer2.udp_port
) )
peer2_from_peer1.update_tcp_port(3333) peer2_from_peer1.update_tcp_port(3333)
peer3 = peer1.peer_manager.get_kademlia_peer( peer3 = make_kademlia_peer(
constants.generate_id(), '1.2.3.6', udp_port=4444 constants.generate_id(), '1.2.3.6', udp_port=4444
) )
store_result = await peer2.store_to_peer(b'2' * 48, peer) store_result = await peer2.store_to_peer(b'2' * 48, peer)
@ -103,7 +103,7 @@ class TestProtocol(AsyncioTestCase):
) )
await self.loop.create_datagram_endpoint(lambda: proto, (address, 4444)) await self.loop.create_datagram_endpoint(lambda: proto, (address, 4444))
proto.start() proto.start()
return proto, other_peer.peer_manager.get_kademlia_peer(node_id, address, udp_port=udp_port) return proto, make_kademlia_peer(node_id, address, udp_port=udp_port)
async def test_add_peer_after_handle_request(self): async def test_add_peer_after_handle_request(self):
with dht_mocks.mock_network_loop(self.loop): with dht_mocks.mock_network_loop(self.loop):
@ -129,7 +129,7 @@ class TestProtocol(AsyncioTestCase):
peer1.routing_table.remove_peer(peer_2_from_peer_1) peer1.routing_table.remove_peer(peer_2_from_peer_1)
# peers not known by be good/bad should be enqueued to maybe-ping # peers not known by be good/bad should be enqueued to maybe-ping
peer1_from_peer3 = peer3.get_rpc_peer(peer3.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', 4444)) peer1_from_peer3 = peer3.get_rpc_peer(make_kademlia_peer(node_id1, '1.2.3.4', 4444))
self.assertEqual(0, len(peer1.ping_queue._pending_contacts)) self.assertEqual(0, len(peer1.ping_queue._pending_contacts))
pong = await peer1_from_peer3.ping() pong = await peer1_from_peer3.ping()
self.assertEqual(b'pong', pong) self.assertEqual(b'pong', pong)
@ -137,7 +137,7 @@ class TestProtocol(AsyncioTestCase):
peer1.ping_queue._pending_contacts.clear() peer1.ping_queue._pending_contacts.clear()
# peers who are already good should be added # peers who are already good should be added
peer1_from_peer4 = peer4.get_rpc_peer(peer4.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', 4444)) peer1_from_peer4 = peer4.get_rpc_peer(make_kademlia_peer(node_id1, '1.2.3.4', 4444))
peer1.peer_manager.update_contact_triple(node_id4,'1.2.3.7', 4444) peer1.peer_manager.update_contact_triple(node_id4,'1.2.3.7', 4444)
peer1.peer_manager.report_last_replied('1.2.3.7', 4444) peer1.peer_manager.report_last_replied('1.2.3.7', 4444)
self.assertEqual(0, len(peer1.ping_queue._pending_contacts)) self.assertEqual(0, len(peer1.ping_queue._pending_contacts))
@ -149,7 +149,7 @@ class TestProtocol(AsyncioTestCase):
peer1.routing_table.buckets[0].peers.clear() peer1.routing_table.buckets[0].peers.clear()
# peers who are known to be bad recently should not be added or maybe-pinged # peers who are known to be bad recently should not be added or maybe-pinged
peer1_from_peer4 = peer4.get_rpc_peer(peer4.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', 4444)) peer1_from_peer4 = peer4.get_rpc_peer(make_kademlia_peer(node_id1, '1.2.3.4', 4444))
peer1.peer_manager.update_contact_triple(node_id4,'1.2.3.7', 4444) peer1.peer_manager.update_contact_triple(node_id4,'1.2.3.7', 4444)
peer1.peer_manager.report_failure('1.2.3.7', 4444) peer1.peer_manager.report_failure('1.2.3.7', 4444)
peer1.peer_manager.report_failure('1.2.3.7', 4444) peer1.peer_manager.report_failure('1.2.3.7', 4444)

View file

@ -3,8 +3,7 @@ from torba.testcase import AsyncioTestCase
from tests import dht_mocks from tests import dht_mocks
from lbry.dht import constants from lbry.dht import constants
from lbry.dht.node import Node from lbry.dht.node import Node
from lbry.dht.peer import PeerManager from lbry.dht.peer import PeerManager, make_kademlia_peer
expected_ranges = [ expected_ranges = [
( (
@ -29,6 +28,7 @@ expected_ranges = [
) )
] ]
class TestRouting(AsyncioTestCase): class TestRouting(AsyncioTestCase):
async def test_fill_one_bucket(self): async def test_fill_one_bucket(self):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
@ -53,7 +53,7 @@ class TestRouting(AsyncioTestCase):
for i in range(1, len(peer_addresses)): for i in range(1, len(peer_addresses)):
self.assertEqual(len(node_1.protocol.routing_table.get_peers()), contact_cnt) self.assertEqual(len(node_1.protocol.routing_table.get_peers()), contact_cnt)
node = nodes[i] node = nodes[i]
peer = node_1.protocol.peer_manager.get_kademlia_peer( peer = make_kademlia_peer(
node.protocol.node_id, node.protocol.external_ip, node.protocol.node_id, node.protocol.external_ip,
udp_port=node.protocol.udp_port udp_port=node.protocol.udp_port
) )
@ -66,6 +66,16 @@ class TestRouting(AsyncioTestCase):
for node in nodes.values(): for node in nodes.values():
node.protocol.stop() node.protocol.stop()
async def test_cant_add_peer_without_a_node_id_gracefully(self):
loop = asyncio.get_event_loop()
node = Node(loop, PeerManager(loop), constants.generate_id(), 4444, 4444, 3333, '1.2.3.4')
bad_peer = make_kademlia_peer(None, '1.2.3.4', 5555)
with self.assertLogs(level='WARNING') as logged:
self.assertFalse(await node.protocol._add_peer(bad_peer))
self.assertEqual(1, len(logged.output))
self.assertTrue(logged.output[0].endswith('Tried adding a peer with no node id!'))
async def test_split_buckets(self): async def test_split_buckets(self):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
peer_addresses = [ peer_addresses = [
@ -81,7 +91,7 @@ class TestRouting(AsyncioTestCase):
node_1 = nodes[0] node_1 = nodes[0]
for i in range(1, len(peer_addresses)): for i in range(1, len(peer_addresses)):
node = nodes[i] node = nodes[i]
peer = node_1.protocol.peer_manager.get_kademlia_peer( peer = make_kademlia_peer(
node.protocol.node_id, node.protocol.external_ip, node.protocol.node_id, node.protocol.external_ip,
udp_port=node.protocol.udp_port udp_port=node.protocol.udp_port
) )

View file

@ -8,7 +8,7 @@ from tests import dht_mocks
from lbry.conf import Config from lbry.conf import Config
from lbry.dht import constants from lbry.dht import constants
from lbry.dht.node import Node from lbry.dht.node import Node
from lbry.dht.peer import PeerManager from lbry.dht.peer import PeerManager, make_kademlia_peer
from lbry.dht.blob_announcer import BlobAnnouncer from lbry.dht.blob_announcer import BlobAnnouncer
from lbry.extras.daemon.storage import SQLiteStorage from lbry.extras.daemon.storage import SQLiteStorage
@ -35,7 +35,7 @@ class TestBlobAnnouncer(AsyncioTestCase):
self.nodes.update({len(self.nodes): n}) self.nodes.update({len(self.nodes): n})
if add_to_routing_table: if add_to_routing_table:
self.node.protocol.add_peer( self.node.protocol.add_peer(
self.peer_manager.get_kademlia_peer( make_kademlia_peer(
n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port
) )
) )
@ -67,7 +67,7 @@ class TestBlobAnnouncer(AsyncioTestCase):
await self.add_peer(node_id, address, False) await self.add_peer(node_id, address, False)
last_node = self.nodes[len(self.nodes) - 1] last_node = self.nodes[len(self.nodes) - 1]
peer = last_node.protocol.get_rpc_peer( peer = last_node.protocol.get_rpc_peer(
last_node.protocol.peer_manager.get_kademlia_peer( make_kademlia_peer(
previous_last_node.protocol.node_id, previous_last_node.protocol.external_ip, previous_last_node.protocol.node_id, previous_last_node.protocol.external_ip,
previous_last_node.protocol.udp_port previous_last_node.protocol.udp_port
) )
@ -129,12 +129,12 @@ class TestBlobAnnouncer(AsyncioTestCase):
announced_to = self.nodes[0] announced_to = self.nodes[0]
for i in range(1, peer_count): for i in range(1, peer_count):
node = self.nodes[i] node = self.nodes[i]
kad_peer = announced_to.protocol.peer_manager.get_kademlia_peer( kad_peer = make_kademlia_peer(
node.protocol.node_id, node.protocol.external_ip, node.protocol.udp_port node.protocol.node_id, node.protocol.external_ip, node.protocol.udp_port
) )
await announced_to.protocol._add_peer(kad_peer) await announced_to.protocol._add_peer(kad_peer)
peer = node.protocol.get_rpc_peer( peer = node.protocol.get_rpc_peer(
node.protocol.peer_manager.get_kademlia_peer( make_kademlia_peer(
announced_to.protocol.node_id, announced_to.protocol.node_id,
announced_to.protocol.external_ip, announced_to.protocol.external_ip,
announced_to.protocol.udp_port announced_to.protocol.udp_port

View file

@ -4,7 +4,7 @@ from torba.testcase import AsyncioTestCase
from tests import dht_mocks from tests import dht_mocks
from lbry.dht import constants from lbry.dht import constants
from lbry.dht.node import Node from lbry.dht.node import Node
from lbry.dht.peer import PeerManager from lbry.dht.peer import PeerManager, make_kademlia_peer
class TestNodePingQueueDiscover(AsyncioTestCase): class TestNodePingQueueDiscover(AsyncioTestCase):
@ -42,7 +42,7 @@ class TestNodePingQueueDiscover(AsyncioTestCase):
for i in range(1, len(peer_addresses)): for i in range(1, len(peer_addresses)):
node = nodes[i] node = nodes[i]
assert node.protocol.node_id != node_1.protocol.node_id assert node.protocol.node_id != node_1.protocol.node_id
peer = node_1.protocol.peer_manager.get_kademlia_peer( peer = make_kademlia_peer(
node.protocol.node_id, node.protocol.external_ip, udp_port=node.protocol.udp_port node.protocol.node_id, node.protocol.external_ip, udp_port=node.protocol.udp_port
) )
futs.append(node_1.protocol.get_rpc_peer(peer).ping()) futs.append(node_1.protocol.get_rpc_peer(peer).ping())

View file

@ -1,7 +1,7 @@
import asyncio import asyncio
import unittest import unittest
from lbry.utils import generate_id from lbry.utils import generate_id
from lbry.dht.peer import PeerManager from lbry.dht.peer import PeerManager, make_kademlia_peer
from torba.testcase import AsyncioTestCase from torba.testcase import AsyncioTestCase
@ -10,20 +10,30 @@ class PeerTest(AsyncioTestCase):
self.loop = asyncio.get_event_loop() self.loop = asyncio.get_event_loop()
self.peer_manager = PeerManager(self.loop) self.peer_manager = PeerManager(self.loop)
self.node_ids = [generate_id(), generate_id(), generate_id()] self.node_ids = [generate_id(), generate_id(), generate_id()]
self.first_contact = self.peer_manager.get_kademlia_peer(self.node_ids[1], '127.0.0.1', udp_port=1000) self.first_contact = make_kademlia_peer(self.node_ids[1], '127.0.0.1', udp_port=1000)
self.second_contact = self.peer_manager.get_kademlia_peer(self.node_ids[0], '192.168.0.1', udp_port=1000) self.second_contact = make_kademlia_peer(self.node_ids[0], '192.168.0.1', udp_port=1000)
def test_peer_is_good_unknown_peer(self):
# Scenario: peer replied, but caller doesn't know the node_id.
# Outcome: We can't say it's good or bad.
# (yes, we COULD tell the node id, but not here. It would be
# a side effect and the caller is responsible to discover it)
peer = make_kademlia_peer(None, '1.2.3.4', 4444)
self.peer_manager.report_last_requested('1.2.3.4', 4444)
self.peer_manager.report_last_replied('1.2.3.4', 4444)
self.assertIsNone(self.peer_manager.peer_is_good(peer))
def test_make_contact_error_cases(self): def test_make_contact_error_cases(self):
self.assertRaises(ValueError, self.peer_manager.get_kademlia_peer, self.node_ids[1], '192.168.1.20', 100000) self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '192.168.1.20', 100000)
self.assertRaises(ValueError, self.peer_manager.get_kademlia_peer, self.node_ids[1], '192.168.1.20.1', 1000) self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '192.168.1.20.1', 1000)
self.assertRaises(ValueError, self.peer_manager.get_kademlia_peer, self.node_ids[1], 'this is not an ip', 1000) self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], 'this is not an ip', 1000)
self.assertRaises(ValueError, self.peer_manager.get_kademlia_peer, self.node_ids[1], '192.168.1.20', -1000) self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '192.168.1.20', -1000)
self.assertRaises(ValueError, self.peer_manager.get_kademlia_peer, b'not valid node id', '192.168.1.20', 1000) self.assertRaises(ValueError, make_kademlia_peer, b'not valid node id', '192.168.1.20', 1000)
def test_boolean(self): def test_boolean(self):
self.assertNotEqual(self.first_contact, self.second_contact) self.assertNotEqual(self.first_contact, self.second_contact)
self.assertEqual( self.assertEqual(
self.second_contact, self.peer_manager.get_kademlia_peer(self.node_ids[0], '192.168.0.1', udp_port=1000) self.second_contact, make_kademlia_peer(self.node_ids[0], '192.168.0.1', udp_port=1000)
) )
def test_compact_ip(self): def test_compact_ip(self):

View file

@ -7,8 +7,7 @@ from lbry.blob.blob_file import MAX_BLOB_SIZE
from lbry.blob_exchange.serialization import BlobResponse from lbry.blob_exchange.serialization import BlobResponse
from lbry.blob_exchange.server import BlobServerProtocol from lbry.blob_exchange.server import BlobServerProtocol
from lbry.dht.node import Node from lbry.dht.node import Node
from lbry.dht.peer import KademliaPeer from lbry.dht.peer import make_kademlia_peer
from lbry.extras.daemon.storage import StoredStreamClaim
from lbry.stream.managed_stream import ManagedStream from lbry.stream.managed_stream import ManagedStream
from lbry.stream.descriptor import StreamDescriptor from lbry.stream.descriptor import StreamDescriptor
from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase
@ -95,7 +94,7 @@ class TestManagedStream(BlobExchangeTestBase):
mock_node = mock.Mock(spec=Node) mock_node = mock.Mock(spec=Node)
q = asyncio.Queue() q = asyncio.Queue()
bad_peer = KademliaPeer(self.loop, "127.0.0.1", b'2' * 48, tcp_port=3334) bad_peer = make_kademlia_peer(b'2' * 48, "127.0.0.1", tcp_port=3334)
def _mock_accumulate_peers(q1, q2): def _mock_accumulate_peers(q1, q2):
async def _task(): async def _task():