forked from LBRYCommunity/lbry-sdk
delete old dht functional tests
This commit is contained in:
parent
34a409ade5
commit
ea15674e62
10 changed files with 0 additions and 938 deletions
|
@ -1,190 +0,0 @@
|
|||
import logging
|
||||
import binascii
|
||||
|
||||
from twisted.trial import unittest
|
||||
from twisted.internet import defer, task
|
||||
from lbrynet.dht import constants
|
||||
from lbrynet.dht.node import Node
|
||||
from .mock_transport import resolve, listenUDP, MOCK_DHT_SEED_DNS, mock_node_generator
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TestKademliaBase(unittest.TestCase):
|
||||
timeout = 300.0 # timeout for each test
|
||||
network_size = 16 # including seed nodes
|
||||
node_ids = None
|
||||
seed_dns = MOCK_DHT_SEED_DNS
|
||||
|
||||
def _add_next_node(self):
|
||||
node_id, node_ip = next(self.mock_node_generator)
|
||||
node = Node(node_id=node_id, udpPort=4444, peerPort=3333, externalIP=node_ip,
|
||||
resolve=resolve, listenUDP=listenUDP, callLater=self.clock.callLater, clock=self.clock)
|
||||
self.nodes.append(node)
|
||||
return node
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def add_node(self, known_addresses):
|
||||
node = self._add_next_node()
|
||||
yield node.start(known_addresses)
|
||||
defer.returnValue(node)
|
||||
|
||||
def get_node(self, node_id):
|
||||
for node in self.nodes:
|
||||
if node.node_id == node_id:
|
||||
return node
|
||||
raise KeyError(node_id)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def pop_node(self):
|
||||
node = self.nodes.pop()
|
||||
yield node.stop()
|
||||
|
||||
def pump_clock(self, n, step=None, tick_callback=None):
|
||||
"""
|
||||
:param n: seconds to run the reactor for
|
||||
:param step: reactor tick rate (in seconds)
|
||||
"""
|
||||
advanced = 0.0
|
||||
self.clock._sortCalls()
|
||||
while advanced < n:
|
||||
if step:
|
||||
next_step = step
|
||||
elif self.clock.getDelayedCalls():
|
||||
next_call = self.clock.getDelayedCalls()[0].getTime()
|
||||
next_step = min(n - advanced, max(next_call - self.clock.rightNow, .000000000001))
|
||||
else:
|
||||
next_step = n - advanced
|
||||
assert next_step > 0
|
||||
self.clock.advance(next_step)
|
||||
advanced += float(next_step)
|
||||
if tick_callback and callable(tick_callback):
|
||||
tick_callback(self.clock.seconds())
|
||||
|
||||
def run_reactor(self, seconds, deferreds, tick_callback=None):
|
||||
d = defer.DeferredList(deferreds)
|
||||
self.pump_clock(seconds, tick_callback=tick_callback)
|
||||
return d
|
||||
|
||||
def get_contacts(self):
|
||||
contacts = {}
|
||||
for seed in self._seeds:
|
||||
contacts[seed] = seed.contacts
|
||||
for node in self._seeds:
|
||||
contacts[node] = node.contacts
|
||||
return contacts
|
||||
|
||||
def get_routable_addresses(self):
|
||||
known = set()
|
||||
for n in self._seeds:
|
||||
known.update([(c.id, c.address, c.port) for c in n.contacts])
|
||||
for n in self.nodes:
|
||||
known.update([(c.id, c.address, c.port) for c in n.contacts])
|
||||
addresses = {triple[1] for triple in known}
|
||||
return addresses
|
||||
|
||||
def get_online_addresses(self):
|
||||
online = set()
|
||||
for n in self._seeds:
|
||||
online.add(n.externalIP)
|
||||
for n in self.nodes:
|
||||
online.add(n.externalIP)
|
||||
return online
|
||||
|
||||
def show_info(self, show_contacts=False):
|
||||
known = set()
|
||||
for n in self._seeds:
|
||||
known.update([(c.id, c.address, c.port) for c in n.contacts])
|
||||
for n in self.nodes:
|
||||
known.update([(c.id, c.address, c.port) for c in n.contacts])
|
||||
|
||||
log.info("Routable: %i/%i", len(known), len(self.nodes) + len(self._seeds))
|
||||
if show_contacts:
|
||||
for n in self._seeds:
|
||||
log.info("seed %s (%s) has %i contacts in %i buckets", n.externalIP, binascii.hexlify(n.node_id)[:8], len(n.contacts),
|
||||
len([b for b in n._routingTable._buckets if b.getContacts()]))
|
||||
for n in self.nodes:
|
||||
log.info("node %s (%s) has %i contacts in %i buckets", n.externalIP, binascii.hexlify(n.node_id)[:8], len(n.contacts),
|
||||
len([b for b in n._routingTable._buckets if b.getContacts()]))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def setUp(self):
|
||||
import random
|
||||
random.seed(0)
|
||||
self.nodes = []
|
||||
self._seeds = []
|
||||
self.clock = task.Clock()
|
||||
self.mock_node_generator = mock_node_generator(mock_node_ids=self.node_ids)
|
||||
|
||||
seed_dl = []
|
||||
seeds = sorted(list(self.seed_dns.keys()))
|
||||
known_addresses = [(seed_name, 4444) for seed_name in seeds]
|
||||
for _ in range(len(seeds)):
|
||||
self._add_next_node()
|
||||
seed = self.nodes.pop()
|
||||
self._seeds.append(seed)
|
||||
seed_dl.append(
|
||||
seed.start(known_addresses)
|
||||
)
|
||||
yield self.run_reactor(constants.checkRefreshInterval+1, seed_dl)
|
||||
while len(self.nodes + self._seeds) < self.network_size:
|
||||
network_dl = []
|
||||
for i in range(min(10, self.network_size - len(self._seeds) - len(self.nodes))):
|
||||
network_dl.append(self.add_node(known_addresses))
|
||||
yield self.run_reactor(constants.checkRefreshInterval*2+1, network_dl)
|
||||
self.assertEqual(len(self.nodes + self._seeds), self.network_size)
|
||||
self.verify_all_nodes_are_routable()
|
||||
self.verify_all_nodes_are_pingable()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def tearDown(self):
|
||||
dl = []
|
||||
while self.nodes:
|
||||
dl.append(self.pop_node()) # stop all of the nodes
|
||||
while self._seeds:
|
||||
dl.append(self._seeds.pop().stop()) # and the seeds
|
||||
yield defer.DeferredList(dl)
|
||||
|
||||
def verify_all_nodes_are_routable(self):
|
||||
routable = set()
|
||||
node_addresses = {node.externalIP for node in self.nodes}
|
||||
node_addresses = node_addresses.union({node.externalIP for node in self._seeds})
|
||||
for node in self._seeds:
|
||||
contact_addresses = {contact.address for contact in node.contacts}
|
||||
routable.update(contact_addresses)
|
||||
for node in self.nodes:
|
||||
contact_addresses = {contact.address for contact in node.contacts}
|
||||
routable.update(contact_addresses)
|
||||
self.assertSetEqual(routable, node_addresses)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def verify_all_nodes_are_pingable(self):
|
||||
ping_replies = {}
|
||||
ping_dl = []
|
||||
contacted = set()
|
||||
|
||||
def _ping_cb(result, node, replies):
|
||||
replies[node] = result
|
||||
|
||||
for node in self._seeds:
|
||||
contact_addresses = set()
|
||||
for contact in node.contacts:
|
||||
contact_addresses.add(contact.address)
|
||||
d = contact.ping()
|
||||
d.addCallback(_ping_cb, contact.address, ping_replies)
|
||||
contacted.add(contact.address)
|
||||
ping_dl.append(d)
|
||||
for node in self.nodes:
|
||||
contact_addresses = set()
|
||||
for contact in node.contacts:
|
||||
contact_addresses.add(contact.address)
|
||||
d = contact.ping()
|
||||
d.addCallback(_ping_cb, contact.address, ping_replies)
|
||||
contacted.add(contact.address)
|
||||
ping_dl.append(d)
|
||||
yield self.run_reactor(2, ping_dl)
|
||||
node_addresses = {node.externalIP for node in self.nodes}.union({seed.externalIP for seed in self._seeds})
|
||||
self.assertSetEqual(node_addresses, contacted)
|
||||
expected = {node: b"pong" for node in contacted}
|
||||
self.assertDictEqual(ping_replies, expected)
|
|
@ -1,151 +0,0 @@
|
|||
import struct
|
||||
import hashlib
|
||||
import logging
|
||||
from binascii import unhexlify
|
||||
|
||||
from twisted.internet import defer, error
|
||||
from lbrynet.dht import encoding
|
||||
from lbrynet.dht.error import DecodeError
|
||||
from lbrynet.dht.msgformat import DefaultFormat
|
||||
from lbrynet.dht.msgtypes import ResponseMessage, RequestMessage, ErrorMessage
|
||||
|
||||
_datagram_formatter = DefaultFormat()
|
||||
|
||||
log = logging.getLogger()
|
||||
|
||||
MOCK_DHT_NODES = [
|
||||
unhexlify("cc8db9d0dd9b65b103594b5f992adf09f18b310958fa451d61ce8d06f3ee97a91461777c2b7dea1a89d02d2f23eb0e4f"),
|
||||
unhexlify("83a3a398eead3f162fbbe1afb3d63482bb5b6d3cdd8f9b0825c1dfa58dffd3f6f6026d6e64d6d4ae4c3dfe2262e734ba"),
|
||||
unhexlify("b6928ff25778a7bbb5d258d3b3a06e26db1654f3d2efce8c26681d43f7237cdf2e359a4d309c4473d5d89ec99fb4f573"),
|
||||
]
|
||||
|
||||
MOCK_DHT_SEED_DNS = { # these map to mock nodes 0, 1, and 2
|
||||
"lbrynet1.lbry.io": "10.42.42.1",
|
||||
"lbrynet2.lbry.io": "10.42.42.2",
|
||||
"lbrynet3.lbry.io": "10.42.42.3",
|
||||
"lbrynet4.lbry.io": "10.42.42.4",
|
||||
"lbrynet5.lbry.io": "10.42.42.5",
|
||||
"lbrynet6.lbry.io": "10.42.42.6",
|
||||
"lbrynet7.lbry.io": "10.42.42.7",
|
||||
"lbrynet8.lbry.io": "10.42.42.8",
|
||||
"lbrynet9.lbry.io": "10.42.42.9",
|
||||
"lbrynet10.lbry.io": "10.42.42.10",
|
||||
"lbrynet11.lbry.io": "10.42.42.11",
|
||||
"lbrynet12.lbry.io": "10.42.42.12",
|
||||
"lbrynet13.lbry.io": "10.42.42.13",
|
||||
"lbrynet14.lbry.io": "10.42.42.14",
|
||||
"lbrynet15.lbry.io": "10.42.42.15",
|
||||
"lbrynet16.lbry.io": "10.42.42.16",
|
||||
}
|
||||
|
||||
|
||||
def resolve(name, timeout=(1, 3, 11, 45)):
|
||||
if name not in MOCK_DHT_SEED_DNS:
|
||||
return defer.fail(error.DNSLookupError(name))
|
||||
return defer.succeed(MOCK_DHT_SEED_DNS[name])
|
||||
|
||||
|
||||
class MockUDPTransport:
|
||||
def __init__(self, address, port, max_packet_size, protocol):
|
||||
self.address = address
|
||||
self.port = port
|
||||
self.max_packet_size = max_packet_size
|
||||
self._node = protocol._node
|
||||
|
||||
def write(self, data, address):
|
||||
if address in MockNetwork.peers:
|
||||
dest = MockNetwork.peers[address][0]
|
||||
debug_kademlia_packet(data, (self.address, self.port), address, self._node)
|
||||
dest.datagramReceived(data, (self.address, self.port))
|
||||
else: # the node is sending to an address that doesn't currently exist, act like it never arrived
|
||||
pass
|
||||
|
||||
|
||||
class MockUDPPort:
|
||||
def __init__(self, protocol, remover):
|
||||
self.protocol = protocol
|
||||
self._remover = remover
|
||||
|
||||
def startListening(self, reason=None):
|
||||
return self.protocol.startProtocol()
|
||||
|
||||
def stopListening(self, reason=None):
|
||||
result = self.protocol.stopProtocol()
|
||||
self._remover()
|
||||
return result
|
||||
|
||||
|
||||
class MockNetwork:
|
||||
peers = {} # (interface, port): (protocol, max_packet_size)
|
||||
|
||||
@classmethod
|
||||
def add_peer(cls, port, protocol, interface, maxPacketSize):
|
||||
interface = protocol._node.externalIP
|
||||
protocol.transport = MockUDPTransport(interface, port, maxPacketSize, protocol)
|
||||
cls.peers[(interface, port)] = (protocol, maxPacketSize)
|
||||
|
||||
def remove_peer():
|
||||
del protocol.transport
|
||||
if (interface, port) in cls.peers:
|
||||
del cls.peers[(interface, port)]
|
||||
|
||||
return remove_peer
|
||||
|
||||
|
||||
def listenUDP(port, protocol, interface='', maxPacketSize=8192):
|
||||
remover = MockNetwork.add_peer(port, protocol, interface, maxPacketSize)
|
||||
port = MockUDPPort(protocol, remover)
|
||||
port.startListening()
|
||||
return port
|
||||
|
||||
|
||||
def address_generator(address=(10, 42, 42, 1)):
|
||||
def increment(addr):
|
||||
value = struct.unpack("I", "".join([chr(x) for x in list(addr)[::-1]]).encode())[0] + 1
|
||||
new_addr = []
|
||||
for i in range(4):
|
||||
new_addr.append(value % 256)
|
||||
value >>= 8
|
||||
return tuple(new_addr[::-1])
|
||||
|
||||
while True:
|
||||
yield "{}.{}.{}.{}".format(*address)
|
||||
address = increment(address)
|
||||
|
||||
|
||||
def mock_node_generator(count=None, mock_node_ids=None):
|
||||
if mock_node_ids is None:
|
||||
mock_node_ids = MOCK_DHT_NODES
|
||||
|
||||
for num, node_ip in enumerate(address_generator()):
|
||||
if count and num >= count:
|
||||
break
|
||||
if num >= len(mock_node_ids):
|
||||
h = hashlib.sha384()
|
||||
h.update(("node %i" % num).encode())
|
||||
node_id = h.digest()
|
||||
else:
|
||||
node_id = mock_node_ids[num]
|
||||
yield (node_id, node_ip)
|
||||
|
||||
|
||||
def debug_kademlia_packet(data, source, destination, node):
|
||||
if log.level != logging.DEBUG:
|
||||
return
|
||||
try:
|
||||
packet = _datagram_formatter.fromPrimitive(encoding.bdecode(data))
|
||||
if isinstance(packet, RequestMessage):
|
||||
log.debug("request %s --> %s %s (node time %s)", source[0], destination[0], packet.request,
|
||||
node.clock.seconds())
|
||||
elif isinstance(packet, ResponseMessage):
|
||||
if isinstance(packet.response, bytes):
|
||||
log.debug("response %s <-- %s %s (node time %s)", destination[0], source[0], packet.response,
|
||||
node.clock.seconds())
|
||||
else:
|
||||
log.debug("response %s <-- %s %i contacts (node time %s)", destination[0], source[0],
|
||||
len(packet.response), node.clock.seconds())
|
||||
elif isinstance(packet, ErrorMessage):
|
||||
log.error("error %s <-- %s %s (node time %s)", destination[0], source[0], packet.exceptionType,
|
||||
node.clock.seconds())
|
||||
except DecodeError:
|
||||
log.exception("decode error %s --> %s (node time %s)", source[0], destination[0], node.clock.seconds())
|
|
@ -1,34 +0,0 @@
|
|||
from twisted.trial import unittest
|
||||
|
||||
from tests.functional.dht.dht_test_environment import TestKademliaBase
|
||||
|
||||
|
||||
class TestKademliaBootstrap(TestKademliaBase):
|
||||
"""
|
||||
Test initializing the network / connecting the seed nodes
|
||||
"""
|
||||
|
||||
def test_bootstrap_seed_nodes(self):
|
||||
pass
|
||||
|
||||
|
||||
class TestKademliaBootstrap40Nodes(TestKademliaBase):
|
||||
network_size = 40
|
||||
|
||||
def test_bootstrap_network(self):
|
||||
pass
|
||||
|
||||
|
||||
class TestKademliaBootstrap80Nodes(TestKademliaBase):
|
||||
network_size = 80
|
||||
|
||||
def test_bootstrap_network(self):
|
||||
pass
|
||||
|
||||
|
||||
@unittest.SkipTest
|
||||
class TestKademliaBootstrap120Nodes(TestKademliaBase):
|
||||
network_size = 120
|
||||
|
||||
def test_bootstrap_network(self):
|
||||
pass
|
|
@ -1,40 +0,0 @@
|
|||
import logging
|
||||
from twisted.internet import defer
|
||||
from lbrynet.dht import constants
|
||||
from .dht_test_environment import TestKademliaBase
|
||||
|
||||
log = logging.getLogger()
|
||||
|
||||
|
||||
class TestPeerExpiration(TestKademliaBase):
|
||||
network_size = 40
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_expire_stale_peers(self):
|
||||
removed_addresses = set()
|
||||
removed_nodes = []
|
||||
|
||||
# stop 5 nodes
|
||||
for _ in range(5):
|
||||
n = self.nodes[0]
|
||||
removed_nodes.append(n)
|
||||
removed_addresses.add(n.externalIP)
|
||||
self.nodes.remove(n)
|
||||
yield self.run_reactor(1, [n.stop()])
|
||||
|
||||
offline_addresses = self.get_routable_addresses().difference(self.get_online_addresses())
|
||||
self.assertSetEqual(offline_addresses, removed_addresses)
|
||||
|
||||
get_nodes_with_stale_contacts = lambda: list(filter(lambda node: any(contact.address in offline_addresses
|
||||
for contact in node.contacts),
|
||||
self.nodes + self._seeds))
|
||||
|
||||
self.assertRaises(AssertionError, self.verify_all_nodes_are_routable)
|
||||
self.assertGreater(len(get_nodes_with_stale_contacts()), 1)
|
||||
|
||||
# run the network long enough for two failures to happen
|
||||
self.pump_clock(constants.checkRefreshInterval * 3)
|
||||
|
||||
self.assertEqual(len(get_nodes_with_stale_contacts()), 0)
|
||||
self.verify_all_nodes_are_routable()
|
||||
self.verify_all_nodes_are_pingable()
|
|
@ -1,38 +0,0 @@
|
|||
import logging
|
||||
from twisted.internet import defer
|
||||
from lbrynet.dht import constants
|
||||
from .dht_test_environment import TestKademliaBase
|
||||
|
||||
log = logging.getLogger()
|
||||
|
||||
|
||||
class TestReJoin(TestKademliaBase):
|
||||
network_size = 40
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def setUp(self):
|
||||
yield super().setUp()
|
||||
self.removed_node = self.nodes[20]
|
||||
self.nodes.remove(self.removed_node)
|
||||
yield self.run_reactor(1, [self.removed_node.stop()])
|
||||
self.pump_clock(constants.checkRefreshInterval * 2)
|
||||
self.verify_all_nodes_are_routable()
|
||||
self.verify_all_nodes_are_pingable()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_re_join(self):
|
||||
self.nodes.append(self.removed_node)
|
||||
yield self.run_reactor(
|
||||
31, [self.removed_node.start([(seed_name, 4444) for seed_name in sorted(self.seed_dns.keys())])]
|
||||
)
|
||||
self.pump_clock(constants.checkRefreshInterval*2)
|
||||
self.verify_all_nodes_are_routable()
|
||||
self.verify_all_nodes_are_pingable()
|
||||
|
||||
def test_re_join_with_new_ip(self):
|
||||
self.removed_node.externalIP = "10.43.43.43"
|
||||
return self.test_re_join()
|
||||
|
||||
def test_re_join_with_new_node_id(self):
|
||||
self.removed_node.node_id = self.removed_node._generateID()
|
||||
return self.test_re_join()
|
|
@ -1,285 +0,0 @@
|
|||
from binascii import unhexlify
|
||||
|
||||
import time
|
||||
from twisted.trial import unittest
|
||||
import logging
|
||||
from twisted.internet.task import Clock
|
||||
from twisted.internet import defer
|
||||
import lbrynet.dht.protocol
|
||||
import lbrynet.dht.contact
|
||||
from lbrynet.dht.error import TimeoutError
|
||||
from lbrynet.dht.node import Node, rpcmethod
|
||||
from .mock_transport import listenUDP, resolve
|
||||
|
||||
log = logging.getLogger()
|
||||
|
||||
|
||||
class KademliaProtocolTest(unittest.TestCase):
|
||||
""" Test case for the Protocol class """
|
||||
|
||||
udpPort = 9182
|
||||
|
||||
def setUp(self):
|
||||
self._reactor = Clock()
|
||||
self.node = Node(node_id=b'1' * 48, udpPort=self.udpPort, externalIP="127.0.0.1", listenUDP=listenUDP,
|
||||
resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater)
|
||||
self.remote_node = Node(node_id=b'2' * 48, udpPort=self.udpPort, externalIP="127.0.0.2", listenUDP=listenUDP,
|
||||
resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater)
|
||||
self.remote_contact = self.node.contact_manager.make_contact(b'2' * 48, '127.0.0.2', 9182, self.node._protocol)
|
||||
self.us_from_them = self.remote_node.contact_manager.make_contact(b'1' * 48, '127.0.0.1', 9182,
|
||||
self.remote_node._protocol)
|
||||
self.node.start_listening()
|
||||
self.remote_node.start_listening()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def tearDown(self):
|
||||
yield self.node.stop()
|
||||
yield self.remote_node.stop()
|
||||
del self._reactor
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def testReactor(self):
|
||||
""" Tests if the reactor can start/stop the protocol correctly """
|
||||
|
||||
d = defer.Deferred()
|
||||
self._reactor.callLater(1, d.callback, True)
|
||||
self._reactor.advance(1)
|
||||
result = yield d
|
||||
self.assertTrue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def testRPCTimeout(self):
|
||||
""" Tests if a RPC message sent to a dead remote node times out correctly """
|
||||
yield self.remote_node.stop()
|
||||
self._reactor.pump([1 for _ in range(10)])
|
||||
self.node.addContact(self.remote_contact)
|
||||
|
||||
@rpcmethod
|
||||
def fake_ping(*args, **kwargs):
|
||||
time.sleep(lbrynet.dht.constants.rpcTimeout + 1)
|
||||
return 'pong'
|
||||
|
||||
real_ping = self.node.ping
|
||||
real_timeout = lbrynet.dht.constants.rpcTimeout
|
||||
real_attempts = lbrynet.dht.constants.rpcAttempts
|
||||
lbrynet.dht.constants.rpcAttempts = 1
|
||||
lbrynet.dht.constants.rpcTimeout = 1
|
||||
|
||||
self.node.ping = fake_ping
|
||||
# Make sure the contact was added
|
||||
self.assertFalse(self.remote_contact not in self.node.contacts,
|
||||
'Contact not added to fake node (error in test code)')
|
||||
self.node.start_listening()
|
||||
|
||||
# Run the PING RPC (which should raise a timeout error)
|
||||
df = self.remote_contact.ping()
|
||||
|
||||
def check_timeout(err):
|
||||
self.assertEqual(err.type, TimeoutError)
|
||||
|
||||
df.addErrback(check_timeout)
|
||||
|
||||
def reset_values():
|
||||
self.node.ping = real_ping
|
||||
lbrynet.dht.constants.rpcTimeout = real_timeout
|
||||
lbrynet.dht.constants.rpcAttempts = real_attempts
|
||||
|
||||
# See if the contact was removed due to the timeout
|
||||
def check_removed_contact():
|
||||
self.assertFalse(self.remote_contact in self.node.contacts,
|
||||
'Contact was not removed after RPC timeout; check exception types.')
|
||||
|
||||
df.addCallback(lambda _: reset_values())
|
||||
|
||||
# Stop the reactor if a result arrives (timeout or not)
|
||||
df.addCallback(lambda _: check_removed_contact())
|
||||
self._reactor.pump([1 for _ in range(20)])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def testRPCRequest(self):
|
||||
""" Tests if a valid RPC request is executed and responded to correctly """
|
||||
|
||||
yield self.node.addContact(self.remote_contact)
|
||||
|
||||
self.error = None
|
||||
|
||||
def handleError(f):
|
||||
self.error = 'An RPC error occurred: %s' % f.getErrorMessage()
|
||||
|
||||
def handleResult(result):
|
||||
expectedResult = b'pong'
|
||||
if result != expectedResult:
|
||||
self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' \
|
||||
% (expectedResult, result)
|
||||
|
||||
# Simulate the RPC
|
||||
df = self.remote_contact.ping()
|
||||
df.addCallback(handleResult)
|
||||
df.addErrback(handleError)
|
||||
|
||||
self._reactor.advance(2)
|
||||
yield df
|
||||
|
||||
self.assertFalse(self.error, self.error)
|
||||
# The list of sent RPC messages should be empty at this stage
|
||||
self.assertEqual(len(self.node._protocol._sentMessages), 0,
|
||||
'The protocol is still waiting for a RPC result, '
|
||||
'but the transaction is already done!')
|
||||
|
||||
def testRPCAccess(self):
|
||||
""" Tests invalid RPC requests
|
||||
Verifies that a RPC request for an existing but unpublished
|
||||
method is denied, and that the associated (remote) exception gets
|
||||
raised locally """
|
||||
|
||||
self.assertRaises(AttributeError, getattr, self.remote_contact, "not_a_rpc_function")
|
||||
|
||||
def testRPCRequestArgs(self):
|
||||
""" Tests if an RPC requiring arguments is executed correctly """
|
||||
|
||||
self.node.addContact(self.remote_contact)
|
||||
self.error = None
|
||||
|
||||
def handleError(f):
|
||||
self.error = 'An RPC error occurred: %s' % f.getErrorMessage()
|
||||
|
||||
def handleResult(result):
|
||||
expectedResult = b'pong'
|
||||
if result != expectedResult:
|
||||
self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' % \
|
||||
(expectedResult, result)
|
||||
|
||||
# Publish the "local" node on the network
|
||||
self.node.start_listening()
|
||||
# Simulate the RPC
|
||||
df = self.remote_contact.ping()
|
||||
df.addCallback(handleResult)
|
||||
df.addErrback(handleError)
|
||||
self._reactor.pump([1 for _ in range(10)])
|
||||
self.assertFalse(self.error, self.error)
|
||||
# The list of sent RPC messages should be empty at this stage
|
||||
self.assertEqual(len(self.node._protocol._sentMessages), 0,
|
||||
'The protocol is still waiting for a RPC result, '
|
||||
'but the transaction is already done!')
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def testDetectProtocolVersion(self):
|
||||
original_findvalue = self.remote_node.findValue
|
||||
fake_blob = unhexlify("AB" * 48)
|
||||
|
||||
@rpcmethod
|
||||
def findValue(contact, key):
|
||||
result = original_findvalue(contact, key)
|
||||
result.pop(b'protocolVersion')
|
||||
return result
|
||||
|
||||
self.remote_node.findValue = findValue
|
||||
d = self.remote_contact.findValue(fake_blob)
|
||||
self._reactor.advance(3)
|
||||
find_value_response = yield d
|
||||
self.assertEqual(self.remote_contact.protocolVersion, 0)
|
||||
self.assertNotIn('protocolVersion', find_value_response)
|
||||
|
||||
self.remote_node.findValue = original_findvalue
|
||||
d = self.remote_contact.findValue(fake_blob)
|
||||
self._reactor.advance(3)
|
||||
find_value_response = yield d
|
||||
self.assertEqual(self.remote_contact.protocolVersion, 1)
|
||||
self.assertNotIn('protocolVersion', find_value_response)
|
||||
|
||||
self.remote_node.findValue = findValue
|
||||
d = self.remote_contact.findValue(fake_blob)
|
||||
self._reactor.advance(3)
|
||||
find_value_response = yield d
|
||||
self.assertEqual(self.remote_contact.protocolVersion, 0)
|
||||
self.assertNotIn('protocolVersion', find_value_response)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def testStoreToPre_0_20_0_Node(self):
|
||||
def _dont_migrate(contact, method, *args):
|
||||
return args, {}
|
||||
|
||||
self.remote_node._protocol._migrate_incoming_rpc_args = _dont_migrate
|
||||
|
||||
original_findvalue = self.remote_node.findValue
|
||||
original_store = self.remote_node.store
|
||||
|
||||
@rpcmethod
|
||||
def findValue(contact, key):
|
||||
result = original_findvalue(contact, key)
|
||||
if b'protocolVersion' in result:
|
||||
result.pop(b'protocolVersion')
|
||||
return result
|
||||
|
||||
@rpcmethod
|
||||
def store(contact, key, value, originalPublisherID=None, self_store=False, **kwargs):
|
||||
self.assertEqual(len(key), 48)
|
||||
self.assertSetEqual(set(value.keys()), {b'token', b'lbryid', b'port'})
|
||||
self.assertFalse(self_store)
|
||||
self.assertDictEqual(kwargs, {})
|
||||
return original_store( # pylint: disable=too-many-function-args
|
||||
contact, key, value[b'token'], value[b'port'], originalPublisherID, 0
|
||||
)
|
||||
|
||||
self.remote_node.findValue = findValue
|
||||
self.remote_node.store = store
|
||||
|
||||
fake_blob = unhexlify("AB" * 48)
|
||||
|
||||
d = self.remote_contact.findValue(fake_blob)
|
||||
self._reactor.advance(3)
|
||||
find_value_response = yield d
|
||||
self.assertEqual(self.remote_contact.protocolVersion, 0)
|
||||
self.assertNotIn(b'protocolVersion', find_value_response)
|
||||
token = find_value_response[b'token']
|
||||
d = self.remote_contact.store(fake_blob, token, 3333, self.node.node_id, 0)
|
||||
self._reactor.advance(3)
|
||||
response = yield d
|
||||
self.assertEqual(response, b'OK')
|
||||
self.assertEqual(self.remote_contact.protocolVersion, 0)
|
||||
self.assertTrue(self.remote_node._dataStore.hasPeersForBlob(fake_blob))
|
||||
self.assertEqual(len(self.remote_node._dataStore.getStoringContacts()), 1)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def testStoreFromPre_0_20_0_Node(self):
|
||||
def _dont_migrate(contact, method, *args):
|
||||
return args
|
||||
|
||||
self.remote_node._protocol._migrate_outgoing_rpc_args = _dont_migrate
|
||||
|
||||
us_from_them = self.remote_node.contact_manager.make_contact(b'1' * 48, '127.0.0.1', self.udpPort,
|
||||
self.remote_node._protocol)
|
||||
|
||||
fake_blob = unhexlify("AB" * 48)
|
||||
|
||||
d = us_from_them.findValue(fake_blob)
|
||||
self._reactor.advance(3)
|
||||
find_value_response = yield d
|
||||
self.assertEqual(self.remote_contact.protocolVersion, 0)
|
||||
self.assertNotIn(b'protocolVersion', find_value_response)
|
||||
token = find_value_response[b'token']
|
||||
us_from_them.update_protocol_version(0)
|
||||
d = self.remote_node._protocol.sendRPC(
|
||||
us_from_them, b"store", (fake_blob, {b'lbryid': self.remote_node.node_id, b'token': token, b'port': 3333})
|
||||
)
|
||||
self._reactor.advance(3)
|
||||
response = yield d
|
||||
self.assertEqual(response, b'OK')
|
||||
self.assertEqual(self.remote_contact.protocolVersion, 0)
|
||||
self.assertTrue(self.node._dataStore.hasPeersForBlob(fake_blob))
|
||||
self.assertEqual(len(self.node._dataStore.getStoringContacts()), 1)
|
||||
self.assertIs(self.node._dataStore.getStoringContacts()[0], self.remote_contact)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_find_node(self):
|
||||
self.node.addContact(self.node.contact_manager.make_contact(
|
||||
self.remote_contact.id, self.remote_contact.address, self.remote_contact.port, self.node._protocol)
|
||||
)
|
||||
result = self.node.findContact(b'0'*48)
|
||||
for _ in range(6):
|
||||
self._reactor.advance(1)
|
||||
self.assertIsNone((yield result))
|
||||
result = self.node.findContact(self.remote_contact.id)
|
||||
for _ in range(6):
|
||||
self._reactor.advance(1)
|
||||
self.assertEqual((yield result).id, self.remote_contact.id)
|
|
@ -1,28 +0,0 @@
|
|||
from lbrynet.dht import constants
|
||||
from lbrynet.dht.distance import Distance
|
||||
import logging
|
||||
|
||||
from tests.functional.dht.dht_test_environment import TestKademliaBase
|
||||
|
||||
log = logging.getLogger()
|
||||
|
||||
|
||||
class TestFindNode(TestKademliaBase):
|
||||
"""
|
||||
This tests the local routing table lookup for a node, every node should return the sorted k contacts closest
|
||||
to the querying node (even if the key being looked up is known)
|
||||
"""
|
||||
network_size = 35
|
||||
|
||||
def test_find_node(self):
|
||||
last_node_id = self.nodes[-1].node_id
|
||||
to_last_node = Distance(last_node_id)
|
||||
for n in self.nodes:
|
||||
find_close_nodes_result = n._routingTable.findCloseNodes(last_node_id, constants.k)
|
||||
self.assertEqual(len(find_close_nodes_result), constants.k)
|
||||
found_ids = [c.id for c in find_close_nodes_result]
|
||||
self.assertListEqual(found_ids, sorted(found_ids, key=lambda x: to_last_node(x)))
|
||||
if last_node_id in [c.id for c in n.contacts]:
|
||||
self.assertEqual(found_ids[0], last_node_id)
|
||||
else:
|
||||
self.assertNotIn(last_node_id, found_ids)
|
|
@ -1,172 +0,0 @@
|
|||
import struct
|
||||
from binascii import hexlify
|
||||
|
||||
from twisted.internet import defer
|
||||
from lbrynet.dht import constants
|
||||
from lbrynet.utils import generate_id
|
||||
from .dht_test_environment import TestKademliaBase
|
||||
import logging
|
||||
|
||||
log = logging.getLogger()
|
||||
|
||||
|
||||
class TestStoreExpiration(TestKademliaBase):
|
||||
network_size = 40
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_nullify_token(self):
|
||||
blob_hash = generate_id(1)
|
||||
announcing_node = self.nodes[20]
|
||||
# announce the blob
|
||||
announce_d = announcing_node.announceHaveBlob(blob_hash)
|
||||
self.pump_clock(5+1)
|
||||
storing_node_ids = yield announce_d
|
||||
self.assertEqual(len(storing_node_ids), 8)
|
||||
|
||||
for node in set(self.nodes).union(set(self._seeds)):
|
||||
# now, everyone has the wrong token
|
||||
node.change_token()
|
||||
node.change_token()
|
||||
|
||||
announce_d = announcing_node.announceHaveBlob(blob_hash)
|
||||
self.pump_clock(5+1)
|
||||
storing_node_ids = yield announce_d
|
||||
self.assertEqual(len(storing_node_ids), 0) # can't store, wrong tokens, but they get nullified
|
||||
|
||||
announce_d = announcing_node.announceHaveBlob(blob_hash)
|
||||
self.pump_clock(5+1)
|
||||
storing_node_ids = yield announce_d
|
||||
self.assertEqual(len(storing_node_ids), 8) # next attempt succeeds as it refreshes tokens
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_store_and_expire(self):
|
||||
blob_hash = generate_id(1)
|
||||
announcing_node = self.nodes[20]
|
||||
# announce the blob
|
||||
announce_d = announcing_node.announceHaveBlob(blob_hash)
|
||||
self.pump_clock(5+1)
|
||||
storing_node_ids = yield announce_d
|
||||
all_nodes = set(self.nodes).union(set(self._seeds))
|
||||
|
||||
# verify the nodes we think stored it did actually store it
|
||||
storing_nodes = [node for node in all_nodes if hexlify(node.node_id) in storing_node_ids]
|
||||
self.assertEqual(len(storing_nodes), len(storing_node_ids))
|
||||
self.assertEqual(len(storing_nodes), constants.k)
|
||||
for node in storing_nodes:
|
||||
self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash))
|
||||
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
|
||||
self.assertEqual(list(map(lambda contact: (contact.id, contact.address, contact.port),
|
||||
node._dataStore.getStoringContacts())), [(announcing_node.node_id,
|
||||
announcing_node.externalIP,
|
||||
announcing_node.port)])
|
||||
self.assertEqual(len(datastore_result), 1)
|
||||
expanded_peers = []
|
||||
for peer in datastore_result:
|
||||
host = ".".join([str(d) for d in peer[:4]])
|
||||
port, = struct.unpack('>H', peer[4:6])
|
||||
peer_node_id = peer[6:]
|
||||
if (host, port, peer_node_id) not in expanded_peers:
|
||||
expanded_peers.append((peer_node_id, host, port))
|
||||
self.assertEqual(expanded_peers[0],
|
||||
(announcing_node.node_id, announcing_node.externalIP, announcing_node.peerPort))
|
||||
|
||||
# verify the announced blob expires in the storing nodes datastores
|
||||
|
||||
self.clock.advance(constants.dataExpireTimeout) # skip the clock directly ahead
|
||||
for node in storing_nodes:
|
||||
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
|
||||
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
|
||||
self.assertEqual(len(datastore_result), 0)
|
||||
self.assertIn(blob_hash, node._dataStore) # the looping call shouldn't have removed it yet
|
||||
self.assertEqual(len(node._dataStore.getStoringContacts()), 1)
|
||||
|
||||
self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh)
|
||||
for node in storing_nodes:
|
||||
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
|
||||
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
|
||||
self.assertEqual(len(datastore_result), 0)
|
||||
self.assertEqual(len(node._dataStore.getStoringContacts()), 0)
|
||||
self.assertNotIn(blob_hash, node._dataStore.keys()) # the looping call should have fired
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_storing_node_went_stale_then_came_back(self):
|
||||
blob_hash = generate_id(1)
|
||||
announcing_node = self.nodes[20]
|
||||
# announce the blob
|
||||
announce_d = announcing_node.announceHaveBlob(blob_hash)
|
||||
self.pump_clock(5+1)
|
||||
storing_node_ids = yield announce_d
|
||||
all_nodes = set(self.nodes).union(set(self._seeds))
|
||||
|
||||
# verify the nodes we think stored it did actually store it
|
||||
storing_nodes = [node for node in all_nodes if hexlify(node.node_id) in storing_node_ids]
|
||||
self.assertEqual(len(storing_nodes), len(storing_node_ids))
|
||||
self.assertEqual(len(storing_nodes), constants.k)
|
||||
for node in storing_nodes:
|
||||
self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash))
|
||||
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
|
||||
self.assertEqual(list(map(lambda contact: (contact.id, contact.address, contact.port),
|
||||
node._dataStore.getStoringContacts())), [(announcing_node.node_id,
|
||||
announcing_node.externalIP,
|
||||
announcing_node.port)])
|
||||
self.assertEqual(len(datastore_result), 1)
|
||||
expanded_peers = []
|
||||
for peer in datastore_result:
|
||||
host = ".".join([str(d) for d in peer[:4]])
|
||||
port, = struct.unpack('>H', peer[4:6])
|
||||
peer_node_id = peer[6:]
|
||||
if (host, port, peer_node_id) not in expanded_peers:
|
||||
expanded_peers.append((peer_node_id, host, port))
|
||||
self.assertEqual(expanded_peers[0],
|
||||
(announcing_node.node_id, announcing_node.externalIP, announcing_node.peerPort))
|
||||
|
||||
self.pump_clock(constants.checkRefreshInterval*2)
|
||||
|
||||
# stop the node
|
||||
self.nodes.remove(announcing_node)
|
||||
yield self.run_reactor(31, [announcing_node.stop()])
|
||||
# run the network for an hour, which should expire the removed node and turn the announced value stale
|
||||
self.pump_clock(constants.checkRefreshInterval * 5, constants.checkRefreshInterval/2)
|
||||
self.verify_all_nodes_are_routable()
|
||||
|
||||
# make sure the contact isn't returned as a peer for the blob, but that we still have the entry in the
|
||||
# datastore in case the node comes back
|
||||
for node in storing_nodes:
|
||||
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
|
||||
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
|
||||
self.assertEqual(len(datastore_result), 0)
|
||||
self.assertEqual(len(node._dataStore.getStoringContacts()), 1)
|
||||
self.assertIn(blob_hash, node._dataStore)
|
||||
|
||||
# # bring the announcing node back online
|
||||
self.nodes.append(announcing_node)
|
||||
yield self.run_reactor(
|
||||
31, [announcing_node.start([(seed_name, 4444) for seed_name in sorted(self.seed_dns.keys())])]
|
||||
)
|
||||
self.pump_clock(constants.checkRefreshInterval * 2)
|
||||
self.verify_all_nodes_are_routable()
|
||||
|
||||
# now the announcing node should once again be returned as a peer for the blob
|
||||
for node in storing_nodes:
|
||||
self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash))
|
||||
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
|
||||
self.assertEqual(len(datastore_result), 1)
|
||||
self.assertEqual(len(node._dataStore.getStoringContacts()), 1)
|
||||
self.assertIn(blob_hash, node._dataStore)
|
||||
|
||||
# verify the announced blob expires in the storing nodes datastores
|
||||
self.clock.advance(constants.dataExpireTimeout) # skip the clock directly ahead
|
||||
for node in storing_nodes:
|
||||
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
|
||||
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
|
||||
self.assertEqual(len(datastore_result), 0)
|
||||
self.assertIn(blob_hash, node._dataStore) # the looping call shouldn't have removed it yet
|
||||
self.assertEqual(len(node._dataStore.getStoringContacts()), 1)
|
||||
|
||||
self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh)
|
||||
for node in storing_nodes:
|
||||
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
|
||||
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
|
||||
self.assertEqual(len(datastore_result), 0)
|
||||
self.assertEqual(len(node._dataStore.getStoringContacts()), 0)
|
||||
self.assertNotIn(blob_hash, node._dataStore) # the looping call should have fired
|
Loading…
Reference in a new issue