diff --git a/tests/functional/__init__.py b/tests/functional/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/functional/dht/__init__.py b/tests/functional/dht/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/functional/dht/dht_test_environment.py b/tests/functional/dht/dht_test_environment.py deleted file mode 100644 index 7dd138dc0..000000000 --- a/tests/functional/dht/dht_test_environment.py +++ /dev/null @@ -1,190 +0,0 @@ -import logging -import binascii - -from twisted.trial import unittest -from twisted.internet import defer, task -from lbrynet.dht import constants -from lbrynet.dht.node import Node -from .mock_transport import resolve, listenUDP, MOCK_DHT_SEED_DNS, mock_node_generator - - -log = logging.getLogger(__name__) - - -class TestKademliaBase(unittest.TestCase): - timeout = 300.0 # timeout for each test - network_size = 16 # including seed nodes - node_ids = None - seed_dns = MOCK_DHT_SEED_DNS - - def _add_next_node(self): - node_id, node_ip = next(self.mock_node_generator) - node = Node(node_id=node_id, udpPort=4444, peerPort=3333, externalIP=node_ip, - resolve=resolve, listenUDP=listenUDP, callLater=self.clock.callLater, clock=self.clock) - self.nodes.append(node) - return node - - @defer.inlineCallbacks - def add_node(self, known_addresses): - node = self._add_next_node() - yield node.start(known_addresses) - defer.returnValue(node) - - def get_node(self, node_id): - for node in self.nodes: - if node.node_id == node_id: - return node - raise KeyError(node_id) - - @defer.inlineCallbacks - def pop_node(self): - node = self.nodes.pop() - yield node.stop() - - def pump_clock(self, n, step=None, tick_callback=None): - """ - :param n: seconds to run the reactor for - :param step: reactor tick rate (in seconds) - """ - advanced = 0.0 - self.clock._sortCalls() - while advanced < n: - if step: - next_step = step - elif self.clock.getDelayedCalls(): - next_call = self.clock.getDelayedCalls()[0].getTime() - next_step = min(n - advanced, max(next_call - self.clock.rightNow, .000000000001)) - else: - next_step = n - advanced - assert next_step > 0 - self.clock.advance(next_step) - advanced += float(next_step) - if tick_callback and callable(tick_callback): - tick_callback(self.clock.seconds()) - - def run_reactor(self, seconds, deferreds, tick_callback=None): - d = defer.DeferredList(deferreds) - self.pump_clock(seconds, tick_callback=tick_callback) - return d - - def get_contacts(self): - contacts = {} - for seed in self._seeds: - contacts[seed] = seed.contacts - for node in self._seeds: - contacts[node] = node.contacts - return contacts - - def get_routable_addresses(self): - known = set() - for n in self._seeds: - known.update([(c.id, c.address, c.port) for c in n.contacts]) - for n in self.nodes: - known.update([(c.id, c.address, c.port) for c in n.contacts]) - addresses = {triple[1] for triple in known} - return addresses - - def get_online_addresses(self): - online = set() - for n in self._seeds: - online.add(n.externalIP) - for n in self.nodes: - online.add(n.externalIP) - return online - - def show_info(self, show_contacts=False): - known = set() - for n in self._seeds: - known.update([(c.id, c.address, c.port) for c in n.contacts]) - for n in self.nodes: - known.update([(c.id, c.address, c.port) for c in n.contacts]) - - log.info("Routable: %i/%i", len(known), len(self.nodes) + len(self._seeds)) - if show_contacts: - for n in self._seeds: - log.info("seed %s (%s) has %i contacts in %i buckets", n.externalIP, binascii.hexlify(n.node_id)[:8], len(n.contacts), - len([b for b in n._routingTable._buckets if b.getContacts()])) - for n in self.nodes: - log.info("node %s (%s) has %i contacts in %i buckets", n.externalIP, binascii.hexlify(n.node_id)[:8], len(n.contacts), - len([b for b in n._routingTable._buckets if b.getContacts()])) - - @defer.inlineCallbacks - def setUp(self): - import random - random.seed(0) - self.nodes = [] - self._seeds = [] - self.clock = task.Clock() - self.mock_node_generator = mock_node_generator(mock_node_ids=self.node_ids) - - seed_dl = [] - seeds = sorted(list(self.seed_dns.keys())) - known_addresses = [(seed_name, 4444) for seed_name in seeds] - for _ in range(len(seeds)): - self._add_next_node() - seed = self.nodes.pop() - self._seeds.append(seed) - seed_dl.append( - seed.start(known_addresses) - ) - yield self.run_reactor(constants.checkRefreshInterval+1, seed_dl) - while len(self.nodes + self._seeds) < self.network_size: - network_dl = [] - for i in range(min(10, self.network_size - len(self._seeds) - len(self.nodes))): - network_dl.append(self.add_node(known_addresses)) - yield self.run_reactor(constants.checkRefreshInterval*2+1, network_dl) - self.assertEqual(len(self.nodes + self._seeds), self.network_size) - self.verify_all_nodes_are_routable() - self.verify_all_nodes_are_pingable() - - @defer.inlineCallbacks - def tearDown(self): - dl = [] - while self.nodes: - dl.append(self.pop_node()) # stop all of the nodes - while self._seeds: - dl.append(self._seeds.pop().stop()) # and the seeds - yield defer.DeferredList(dl) - - def verify_all_nodes_are_routable(self): - routable = set() - node_addresses = {node.externalIP for node in self.nodes} - node_addresses = node_addresses.union({node.externalIP for node in self._seeds}) - for node in self._seeds: - contact_addresses = {contact.address for contact in node.contacts} - routable.update(contact_addresses) - for node in self.nodes: - contact_addresses = {contact.address for contact in node.contacts} - routable.update(contact_addresses) - self.assertSetEqual(routable, node_addresses) - - @defer.inlineCallbacks - def verify_all_nodes_are_pingable(self): - ping_replies = {} - ping_dl = [] - contacted = set() - - def _ping_cb(result, node, replies): - replies[node] = result - - for node in self._seeds: - contact_addresses = set() - for contact in node.contacts: - contact_addresses.add(contact.address) - d = contact.ping() - d.addCallback(_ping_cb, contact.address, ping_replies) - contacted.add(contact.address) - ping_dl.append(d) - for node in self.nodes: - contact_addresses = set() - for contact in node.contacts: - contact_addresses.add(contact.address) - d = contact.ping() - d.addCallback(_ping_cb, contact.address, ping_replies) - contacted.add(contact.address) - ping_dl.append(d) - yield self.run_reactor(2, ping_dl) - node_addresses = {node.externalIP for node in self.nodes}.union({seed.externalIP for seed in self._seeds}) - self.assertSetEqual(node_addresses, contacted) - expected = {node: b"pong" for node in contacted} - self.assertDictEqual(ping_replies, expected) diff --git a/tests/functional/dht/mock_transport.py b/tests/functional/dht/mock_transport.py deleted file mode 100644 index 3bc91061a..000000000 --- a/tests/functional/dht/mock_transport.py +++ /dev/null @@ -1,151 +0,0 @@ -import struct -import hashlib -import logging -from binascii import unhexlify - -from twisted.internet import defer, error -from lbrynet.dht import encoding -from lbrynet.dht.error import DecodeError -from lbrynet.dht.msgformat import DefaultFormat -from lbrynet.dht.msgtypes import ResponseMessage, RequestMessage, ErrorMessage - -_datagram_formatter = DefaultFormat() - -log = logging.getLogger() - -MOCK_DHT_NODES = [ - unhexlify("cc8db9d0dd9b65b103594b5f992adf09f18b310958fa451d61ce8d06f3ee97a91461777c2b7dea1a89d02d2f23eb0e4f"), - unhexlify("83a3a398eead3f162fbbe1afb3d63482bb5b6d3cdd8f9b0825c1dfa58dffd3f6f6026d6e64d6d4ae4c3dfe2262e734ba"), - unhexlify("b6928ff25778a7bbb5d258d3b3a06e26db1654f3d2efce8c26681d43f7237cdf2e359a4d309c4473d5d89ec99fb4f573"), -] - -MOCK_DHT_SEED_DNS = { # these map to mock nodes 0, 1, and 2 - "lbrynet1.lbry.io": "10.42.42.1", - "lbrynet2.lbry.io": "10.42.42.2", - "lbrynet3.lbry.io": "10.42.42.3", - "lbrynet4.lbry.io": "10.42.42.4", - "lbrynet5.lbry.io": "10.42.42.5", - "lbrynet6.lbry.io": "10.42.42.6", - "lbrynet7.lbry.io": "10.42.42.7", - "lbrynet8.lbry.io": "10.42.42.8", - "lbrynet9.lbry.io": "10.42.42.9", - "lbrynet10.lbry.io": "10.42.42.10", - "lbrynet11.lbry.io": "10.42.42.11", - "lbrynet12.lbry.io": "10.42.42.12", - "lbrynet13.lbry.io": "10.42.42.13", - "lbrynet14.lbry.io": "10.42.42.14", - "lbrynet15.lbry.io": "10.42.42.15", - "lbrynet16.lbry.io": "10.42.42.16", -} - - -def resolve(name, timeout=(1, 3, 11, 45)): - if name not in MOCK_DHT_SEED_DNS: - return defer.fail(error.DNSLookupError(name)) - return defer.succeed(MOCK_DHT_SEED_DNS[name]) - - -class MockUDPTransport: - def __init__(self, address, port, max_packet_size, protocol): - self.address = address - self.port = port - self.max_packet_size = max_packet_size - self._node = protocol._node - - def write(self, data, address): - if address in MockNetwork.peers: - dest = MockNetwork.peers[address][0] - debug_kademlia_packet(data, (self.address, self.port), address, self._node) - dest.datagramReceived(data, (self.address, self.port)) - else: # the node is sending to an address that doesn't currently exist, act like it never arrived - pass - - -class MockUDPPort: - def __init__(self, protocol, remover): - self.protocol = protocol - self._remover = remover - - def startListening(self, reason=None): - return self.protocol.startProtocol() - - def stopListening(self, reason=None): - result = self.protocol.stopProtocol() - self._remover() - return result - - -class MockNetwork: - peers = {} # (interface, port): (protocol, max_packet_size) - - @classmethod - def add_peer(cls, port, protocol, interface, maxPacketSize): - interface = protocol._node.externalIP - protocol.transport = MockUDPTransport(interface, port, maxPacketSize, protocol) - cls.peers[(interface, port)] = (protocol, maxPacketSize) - - def remove_peer(): - del protocol.transport - if (interface, port) in cls.peers: - del cls.peers[(interface, port)] - - return remove_peer - - -def listenUDP(port, protocol, interface='', maxPacketSize=8192): - remover = MockNetwork.add_peer(port, protocol, interface, maxPacketSize) - port = MockUDPPort(protocol, remover) - port.startListening() - return port - - -def address_generator(address=(10, 42, 42, 1)): - def increment(addr): - value = struct.unpack("I", "".join([chr(x) for x in list(addr)[::-1]]).encode())[0] + 1 - new_addr = [] - for i in range(4): - new_addr.append(value % 256) - value >>= 8 - return tuple(new_addr[::-1]) - - while True: - yield "{}.{}.{}.{}".format(*address) - address = increment(address) - - -def mock_node_generator(count=None, mock_node_ids=None): - if mock_node_ids is None: - mock_node_ids = MOCK_DHT_NODES - - for num, node_ip in enumerate(address_generator()): - if count and num >= count: - break - if num >= len(mock_node_ids): - h = hashlib.sha384() - h.update(("node %i" % num).encode()) - node_id = h.digest() - else: - node_id = mock_node_ids[num] - yield (node_id, node_ip) - - -def debug_kademlia_packet(data, source, destination, node): - if log.level != logging.DEBUG: - return - try: - packet = _datagram_formatter.fromPrimitive(encoding.bdecode(data)) - if isinstance(packet, RequestMessage): - log.debug("request %s --> %s %s (node time %s)", source[0], destination[0], packet.request, - node.clock.seconds()) - elif isinstance(packet, ResponseMessage): - if isinstance(packet.response, bytes): - log.debug("response %s <-- %s %s (node time %s)", destination[0], source[0], packet.response, - node.clock.seconds()) - else: - log.debug("response %s <-- %s %i contacts (node time %s)", destination[0], source[0], - len(packet.response), node.clock.seconds()) - elif isinstance(packet, ErrorMessage): - log.error("error %s <-- %s %s (node time %s)", destination[0], source[0], packet.exceptionType, - node.clock.seconds()) - except DecodeError: - log.exception("decode error %s --> %s (node time %s)", source[0], destination[0], node.clock.seconds()) diff --git a/tests/functional/dht/test_bootstrap_network.py b/tests/functional/dht/test_bootstrap_network.py deleted file mode 100644 index 82b2fc410..000000000 --- a/tests/functional/dht/test_bootstrap_network.py +++ /dev/null @@ -1,34 +0,0 @@ -from twisted.trial import unittest - -from tests.functional.dht.dht_test_environment import TestKademliaBase - - -class TestKademliaBootstrap(TestKademliaBase): - """ - Test initializing the network / connecting the seed nodes - """ - - def test_bootstrap_seed_nodes(self): - pass - - -class TestKademliaBootstrap40Nodes(TestKademliaBase): - network_size = 40 - - def test_bootstrap_network(self): - pass - - -class TestKademliaBootstrap80Nodes(TestKademliaBase): - network_size = 80 - - def test_bootstrap_network(self): - pass - - -@unittest.SkipTest -class TestKademliaBootstrap120Nodes(TestKademliaBase): - network_size = 120 - - def test_bootstrap_network(self): - pass diff --git a/tests/functional/dht/test_contact_expiration.py b/tests/functional/dht/test_contact_expiration.py deleted file mode 100644 index 559d9ec75..000000000 --- a/tests/functional/dht/test_contact_expiration.py +++ /dev/null @@ -1,40 +0,0 @@ -import logging -from twisted.internet import defer -from lbrynet.dht import constants -from .dht_test_environment import TestKademliaBase - -log = logging.getLogger() - - -class TestPeerExpiration(TestKademliaBase): - network_size = 40 - - @defer.inlineCallbacks - def test_expire_stale_peers(self): - removed_addresses = set() - removed_nodes = [] - - # stop 5 nodes - for _ in range(5): - n = self.nodes[0] - removed_nodes.append(n) - removed_addresses.add(n.externalIP) - self.nodes.remove(n) - yield self.run_reactor(1, [n.stop()]) - - offline_addresses = self.get_routable_addresses().difference(self.get_online_addresses()) - self.assertSetEqual(offline_addresses, removed_addresses) - - get_nodes_with_stale_contacts = lambda: list(filter(lambda node: any(contact.address in offline_addresses - for contact in node.contacts), - self.nodes + self._seeds)) - - self.assertRaises(AssertionError, self.verify_all_nodes_are_routable) - self.assertGreater(len(get_nodes_with_stale_contacts()), 1) - - # run the network long enough for two failures to happen - self.pump_clock(constants.checkRefreshInterval * 3) - - self.assertEqual(len(get_nodes_with_stale_contacts()), 0) - self.verify_all_nodes_are_routable() - self.verify_all_nodes_are_pingable() diff --git a/tests/functional/dht/test_contact_rejoin.py b/tests/functional/dht/test_contact_rejoin.py deleted file mode 100644 index 925c7c099..000000000 --- a/tests/functional/dht/test_contact_rejoin.py +++ /dev/null @@ -1,38 +0,0 @@ -import logging -from twisted.internet import defer -from lbrynet.dht import constants -from .dht_test_environment import TestKademliaBase - -log = logging.getLogger() - - -class TestReJoin(TestKademliaBase): - network_size = 40 - - @defer.inlineCallbacks - def setUp(self): - yield super().setUp() - self.removed_node = self.nodes[20] - self.nodes.remove(self.removed_node) - yield self.run_reactor(1, [self.removed_node.stop()]) - self.pump_clock(constants.checkRefreshInterval * 2) - self.verify_all_nodes_are_routable() - self.verify_all_nodes_are_pingable() - - @defer.inlineCallbacks - def test_re_join(self): - self.nodes.append(self.removed_node) - yield self.run_reactor( - 31, [self.removed_node.start([(seed_name, 4444) for seed_name in sorted(self.seed_dns.keys())])] - ) - self.pump_clock(constants.checkRefreshInterval*2) - self.verify_all_nodes_are_routable() - self.verify_all_nodes_are_pingable() - - def test_re_join_with_new_ip(self): - self.removed_node.externalIP = "10.43.43.43" - return self.test_re_join() - - def test_re_join_with_new_node_id(self): - self.removed_node.node_id = self.removed_node._generateID() - return self.test_re_join() diff --git a/tests/functional/dht/test_contact_rpc.py b/tests/functional/dht/test_contact_rpc.py deleted file mode 100644 index 62c1b776b..000000000 --- a/tests/functional/dht/test_contact_rpc.py +++ /dev/null @@ -1,285 +0,0 @@ -from binascii import unhexlify - -import time -from twisted.trial import unittest -import logging -from twisted.internet.task import Clock -from twisted.internet import defer -import lbrynet.dht.protocol -import lbrynet.dht.contact -from lbrynet.dht.error import TimeoutError -from lbrynet.dht.node import Node, rpcmethod -from .mock_transport import listenUDP, resolve - -log = logging.getLogger() - - -class KademliaProtocolTest(unittest.TestCase): - """ Test case for the Protocol class """ - - udpPort = 9182 - - def setUp(self): - self._reactor = Clock() - self.node = Node(node_id=b'1' * 48, udpPort=self.udpPort, externalIP="127.0.0.1", listenUDP=listenUDP, - resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater) - self.remote_node = Node(node_id=b'2' * 48, udpPort=self.udpPort, externalIP="127.0.0.2", listenUDP=listenUDP, - resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater) - self.remote_contact = self.node.contact_manager.make_contact(b'2' * 48, '127.0.0.2', 9182, self.node._protocol) - self.us_from_them = self.remote_node.contact_manager.make_contact(b'1' * 48, '127.0.0.1', 9182, - self.remote_node._protocol) - self.node.start_listening() - self.remote_node.start_listening() - - @defer.inlineCallbacks - def tearDown(self): - yield self.node.stop() - yield self.remote_node.stop() - del self._reactor - - @defer.inlineCallbacks - def testReactor(self): - """ Tests if the reactor can start/stop the protocol correctly """ - - d = defer.Deferred() - self._reactor.callLater(1, d.callback, True) - self._reactor.advance(1) - result = yield d - self.assertTrue(result) - - @defer.inlineCallbacks - def testRPCTimeout(self): - """ Tests if a RPC message sent to a dead remote node times out correctly """ - yield self.remote_node.stop() - self._reactor.pump([1 for _ in range(10)]) - self.node.addContact(self.remote_contact) - - @rpcmethod - def fake_ping(*args, **kwargs): - time.sleep(lbrynet.dht.constants.rpcTimeout + 1) - return 'pong' - - real_ping = self.node.ping - real_timeout = lbrynet.dht.constants.rpcTimeout - real_attempts = lbrynet.dht.constants.rpcAttempts - lbrynet.dht.constants.rpcAttempts = 1 - lbrynet.dht.constants.rpcTimeout = 1 - - self.node.ping = fake_ping - # Make sure the contact was added - self.assertFalse(self.remote_contact not in self.node.contacts, - 'Contact not added to fake node (error in test code)') - self.node.start_listening() - - # Run the PING RPC (which should raise a timeout error) - df = self.remote_contact.ping() - - def check_timeout(err): - self.assertEqual(err.type, TimeoutError) - - df.addErrback(check_timeout) - - def reset_values(): - self.node.ping = real_ping - lbrynet.dht.constants.rpcTimeout = real_timeout - lbrynet.dht.constants.rpcAttempts = real_attempts - - # See if the contact was removed due to the timeout - def check_removed_contact(): - self.assertFalse(self.remote_contact in self.node.contacts, - 'Contact was not removed after RPC timeout; check exception types.') - - df.addCallback(lambda _: reset_values()) - - # Stop the reactor if a result arrives (timeout or not) - df.addCallback(lambda _: check_removed_contact()) - self._reactor.pump([1 for _ in range(20)]) - - @defer.inlineCallbacks - def testRPCRequest(self): - """ Tests if a valid RPC request is executed and responded to correctly """ - - yield self.node.addContact(self.remote_contact) - - self.error = None - - def handleError(f): - self.error = 'An RPC error occurred: %s' % f.getErrorMessage() - - def handleResult(result): - expectedResult = b'pong' - if result != expectedResult: - self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' \ - % (expectedResult, result) - - # Simulate the RPC - df = self.remote_contact.ping() - df.addCallback(handleResult) - df.addErrback(handleError) - - self._reactor.advance(2) - yield df - - self.assertFalse(self.error, self.error) - # The list of sent RPC messages should be empty at this stage - self.assertEqual(len(self.node._protocol._sentMessages), 0, - 'The protocol is still waiting for a RPC result, ' - 'but the transaction is already done!') - - def testRPCAccess(self): - """ Tests invalid RPC requests - Verifies that a RPC request for an existing but unpublished - method is denied, and that the associated (remote) exception gets - raised locally """ - - self.assertRaises(AttributeError, getattr, self.remote_contact, "not_a_rpc_function") - - def testRPCRequestArgs(self): - """ Tests if an RPC requiring arguments is executed correctly """ - - self.node.addContact(self.remote_contact) - self.error = None - - def handleError(f): - self.error = 'An RPC error occurred: %s' % f.getErrorMessage() - - def handleResult(result): - expectedResult = b'pong' - if result != expectedResult: - self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' % \ - (expectedResult, result) - - # Publish the "local" node on the network - self.node.start_listening() - # Simulate the RPC - df = self.remote_contact.ping() - df.addCallback(handleResult) - df.addErrback(handleError) - self._reactor.pump([1 for _ in range(10)]) - self.assertFalse(self.error, self.error) - # The list of sent RPC messages should be empty at this stage - self.assertEqual(len(self.node._protocol._sentMessages), 0, - 'The protocol is still waiting for a RPC result, ' - 'but the transaction is already done!') - - @defer.inlineCallbacks - def testDetectProtocolVersion(self): - original_findvalue = self.remote_node.findValue - fake_blob = unhexlify("AB" * 48) - - @rpcmethod - def findValue(contact, key): - result = original_findvalue(contact, key) - result.pop(b'protocolVersion') - return result - - self.remote_node.findValue = findValue - d = self.remote_contact.findValue(fake_blob) - self._reactor.advance(3) - find_value_response = yield d - self.assertEqual(self.remote_contact.protocolVersion, 0) - self.assertNotIn('protocolVersion', find_value_response) - - self.remote_node.findValue = original_findvalue - d = self.remote_contact.findValue(fake_blob) - self._reactor.advance(3) - find_value_response = yield d - self.assertEqual(self.remote_contact.protocolVersion, 1) - self.assertNotIn('protocolVersion', find_value_response) - - self.remote_node.findValue = findValue - d = self.remote_contact.findValue(fake_blob) - self._reactor.advance(3) - find_value_response = yield d - self.assertEqual(self.remote_contact.protocolVersion, 0) - self.assertNotIn('protocolVersion', find_value_response) - - @defer.inlineCallbacks - def testStoreToPre_0_20_0_Node(self): - def _dont_migrate(contact, method, *args): - return args, {} - - self.remote_node._protocol._migrate_incoming_rpc_args = _dont_migrate - - original_findvalue = self.remote_node.findValue - original_store = self.remote_node.store - - @rpcmethod - def findValue(contact, key): - result = original_findvalue(contact, key) - if b'protocolVersion' in result: - result.pop(b'protocolVersion') - return result - - @rpcmethod - def store(contact, key, value, originalPublisherID=None, self_store=False, **kwargs): - self.assertEqual(len(key), 48) - self.assertSetEqual(set(value.keys()), {b'token', b'lbryid', b'port'}) - self.assertFalse(self_store) - self.assertDictEqual(kwargs, {}) - return original_store( # pylint: disable=too-many-function-args - contact, key, value[b'token'], value[b'port'], originalPublisherID, 0 - ) - - self.remote_node.findValue = findValue - self.remote_node.store = store - - fake_blob = unhexlify("AB" * 48) - - d = self.remote_contact.findValue(fake_blob) - self._reactor.advance(3) - find_value_response = yield d - self.assertEqual(self.remote_contact.protocolVersion, 0) - self.assertNotIn(b'protocolVersion', find_value_response) - token = find_value_response[b'token'] - d = self.remote_contact.store(fake_blob, token, 3333, self.node.node_id, 0) - self._reactor.advance(3) - response = yield d - self.assertEqual(response, b'OK') - self.assertEqual(self.remote_contact.protocolVersion, 0) - self.assertTrue(self.remote_node._dataStore.hasPeersForBlob(fake_blob)) - self.assertEqual(len(self.remote_node._dataStore.getStoringContacts()), 1) - - @defer.inlineCallbacks - def testStoreFromPre_0_20_0_Node(self): - def _dont_migrate(contact, method, *args): - return args - - self.remote_node._protocol._migrate_outgoing_rpc_args = _dont_migrate - - us_from_them = self.remote_node.contact_manager.make_contact(b'1' * 48, '127.0.0.1', self.udpPort, - self.remote_node._protocol) - - fake_blob = unhexlify("AB" * 48) - - d = us_from_them.findValue(fake_blob) - self._reactor.advance(3) - find_value_response = yield d - self.assertEqual(self.remote_contact.protocolVersion, 0) - self.assertNotIn(b'protocolVersion', find_value_response) - token = find_value_response[b'token'] - us_from_them.update_protocol_version(0) - d = self.remote_node._protocol.sendRPC( - us_from_them, b"store", (fake_blob, {b'lbryid': self.remote_node.node_id, b'token': token, b'port': 3333}) - ) - self._reactor.advance(3) - response = yield d - self.assertEqual(response, b'OK') - self.assertEqual(self.remote_contact.protocolVersion, 0) - self.assertTrue(self.node._dataStore.hasPeersForBlob(fake_blob)) - self.assertEqual(len(self.node._dataStore.getStoringContacts()), 1) - self.assertIs(self.node._dataStore.getStoringContacts()[0], self.remote_contact) - - @defer.inlineCallbacks - def test_find_node(self): - self.node.addContact(self.node.contact_manager.make_contact( - self.remote_contact.id, self.remote_contact.address, self.remote_contact.port, self.node._protocol) - ) - result = self.node.findContact(b'0'*48) - for _ in range(6): - self._reactor.advance(1) - self.assertIsNone((yield result)) - result = self.node.findContact(self.remote_contact.id) - for _ in range(6): - self._reactor.advance(1) - self.assertEqual((yield result).id, self.remote_contact.id) diff --git a/tests/functional/dht/test_iterative_find.py b/tests/functional/dht/test_iterative_find.py deleted file mode 100644 index 8e46f8109..000000000 --- a/tests/functional/dht/test_iterative_find.py +++ /dev/null @@ -1,28 +0,0 @@ -from lbrynet.dht import constants -from lbrynet.dht.distance import Distance -import logging - -from tests.functional.dht.dht_test_environment import TestKademliaBase - -log = logging.getLogger() - - -class TestFindNode(TestKademliaBase): - """ - This tests the local routing table lookup for a node, every node should return the sorted k contacts closest - to the querying node (even if the key being looked up is known) - """ - network_size = 35 - - def test_find_node(self): - last_node_id = self.nodes[-1].node_id - to_last_node = Distance(last_node_id) - for n in self.nodes: - find_close_nodes_result = n._routingTable.findCloseNodes(last_node_id, constants.k) - self.assertEqual(len(find_close_nodes_result), constants.k) - found_ids = [c.id for c in find_close_nodes_result] - self.assertListEqual(found_ids, sorted(found_ids, key=lambda x: to_last_node(x))) - if last_node_id in [c.id for c in n.contacts]: - self.assertEqual(found_ids[0], last_node_id) - else: - self.assertNotIn(last_node_id, found_ids) diff --git a/tests/functional/dht/test_store.py b/tests/functional/dht/test_store.py deleted file mode 100644 index b45eef3d7..000000000 --- a/tests/functional/dht/test_store.py +++ /dev/null @@ -1,172 +0,0 @@ -import struct -from binascii import hexlify - -from twisted.internet import defer -from lbrynet.dht import constants -from lbrynet.utils import generate_id -from .dht_test_environment import TestKademliaBase -import logging - -log = logging.getLogger() - - -class TestStoreExpiration(TestKademliaBase): - network_size = 40 - - @defer.inlineCallbacks - def test_nullify_token(self): - blob_hash = generate_id(1) - announcing_node = self.nodes[20] - # announce the blob - announce_d = announcing_node.announceHaveBlob(blob_hash) - self.pump_clock(5+1) - storing_node_ids = yield announce_d - self.assertEqual(len(storing_node_ids), 8) - - for node in set(self.nodes).union(set(self._seeds)): - # now, everyone has the wrong token - node.change_token() - node.change_token() - - announce_d = announcing_node.announceHaveBlob(blob_hash) - self.pump_clock(5+1) - storing_node_ids = yield announce_d - self.assertEqual(len(storing_node_ids), 0) # can't store, wrong tokens, but they get nullified - - announce_d = announcing_node.announceHaveBlob(blob_hash) - self.pump_clock(5+1) - storing_node_ids = yield announce_d - self.assertEqual(len(storing_node_ids), 8) # next attempt succeeds as it refreshes tokens - - @defer.inlineCallbacks - def test_store_and_expire(self): - blob_hash = generate_id(1) - announcing_node = self.nodes[20] - # announce the blob - announce_d = announcing_node.announceHaveBlob(blob_hash) - self.pump_clock(5+1) - storing_node_ids = yield announce_d - all_nodes = set(self.nodes).union(set(self._seeds)) - - # verify the nodes we think stored it did actually store it - storing_nodes = [node for node in all_nodes if hexlify(node.node_id) in storing_node_ids] - self.assertEqual(len(storing_nodes), len(storing_node_ids)) - self.assertEqual(len(storing_nodes), constants.k) - for node in storing_nodes: - self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash)) - datastore_result = node._dataStore.getPeersForBlob(blob_hash) - self.assertEqual(list(map(lambda contact: (contact.id, contact.address, contact.port), - node._dataStore.getStoringContacts())), [(announcing_node.node_id, - announcing_node.externalIP, - announcing_node.port)]) - self.assertEqual(len(datastore_result), 1) - expanded_peers = [] - for peer in datastore_result: - host = ".".join([str(d) for d in peer[:4]]) - port, = struct.unpack('>H', peer[4:6]) - peer_node_id = peer[6:] - if (host, port, peer_node_id) not in expanded_peers: - expanded_peers.append((peer_node_id, host, port)) - self.assertEqual(expanded_peers[0], - (announcing_node.node_id, announcing_node.externalIP, announcing_node.peerPort)) - - # verify the announced blob expires in the storing nodes datastores - - self.clock.advance(constants.dataExpireTimeout) # skip the clock directly ahead - for node in storing_nodes: - self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash)) - datastore_result = node._dataStore.getPeersForBlob(blob_hash) - self.assertEqual(len(datastore_result), 0) - self.assertIn(blob_hash, node._dataStore) # the looping call shouldn't have removed it yet - self.assertEqual(len(node._dataStore.getStoringContacts()), 1) - - self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh) - for node in storing_nodes: - self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash)) - datastore_result = node._dataStore.getPeersForBlob(blob_hash) - self.assertEqual(len(datastore_result), 0) - self.assertEqual(len(node._dataStore.getStoringContacts()), 0) - self.assertNotIn(blob_hash, node._dataStore.keys()) # the looping call should have fired - - @defer.inlineCallbacks - def test_storing_node_went_stale_then_came_back(self): - blob_hash = generate_id(1) - announcing_node = self.nodes[20] - # announce the blob - announce_d = announcing_node.announceHaveBlob(blob_hash) - self.pump_clock(5+1) - storing_node_ids = yield announce_d - all_nodes = set(self.nodes).union(set(self._seeds)) - - # verify the nodes we think stored it did actually store it - storing_nodes = [node for node in all_nodes if hexlify(node.node_id) in storing_node_ids] - self.assertEqual(len(storing_nodes), len(storing_node_ids)) - self.assertEqual(len(storing_nodes), constants.k) - for node in storing_nodes: - self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash)) - datastore_result = node._dataStore.getPeersForBlob(blob_hash) - self.assertEqual(list(map(lambda contact: (contact.id, contact.address, contact.port), - node._dataStore.getStoringContacts())), [(announcing_node.node_id, - announcing_node.externalIP, - announcing_node.port)]) - self.assertEqual(len(datastore_result), 1) - expanded_peers = [] - for peer in datastore_result: - host = ".".join([str(d) for d in peer[:4]]) - port, = struct.unpack('>H', peer[4:6]) - peer_node_id = peer[6:] - if (host, port, peer_node_id) not in expanded_peers: - expanded_peers.append((peer_node_id, host, port)) - self.assertEqual(expanded_peers[0], - (announcing_node.node_id, announcing_node.externalIP, announcing_node.peerPort)) - - self.pump_clock(constants.checkRefreshInterval*2) - - # stop the node - self.nodes.remove(announcing_node) - yield self.run_reactor(31, [announcing_node.stop()]) - # run the network for an hour, which should expire the removed node and turn the announced value stale - self.pump_clock(constants.checkRefreshInterval * 5, constants.checkRefreshInterval/2) - self.verify_all_nodes_are_routable() - - # make sure the contact isn't returned as a peer for the blob, but that we still have the entry in the - # datastore in case the node comes back - for node in storing_nodes: - self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash)) - datastore_result = node._dataStore.getPeersForBlob(blob_hash) - self.assertEqual(len(datastore_result), 0) - self.assertEqual(len(node._dataStore.getStoringContacts()), 1) - self.assertIn(blob_hash, node._dataStore) - - # # bring the announcing node back online - self.nodes.append(announcing_node) - yield self.run_reactor( - 31, [announcing_node.start([(seed_name, 4444) for seed_name in sorted(self.seed_dns.keys())])] - ) - self.pump_clock(constants.checkRefreshInterval * 2) - self.verify_all_nodes_are_routable() - - # now the announcing node should once again be returned as a peer for the blob - for node in storing_nodes: - self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash)) - datastore_result = node._dataStore.getPeersForBlob(blob_hash) - self.assertEqual(len(datastore_result), 1) - self.assertEqual(len(node._dataStore.getStoringContacts()), 1) - self.assertIn(blob_hash, node._dataStore) - - # verify the announced blob expires in the storing nodes datastores - self.clock.advance(constants.dataExpireTimeout) # skip the clock directly ahead - for node in storing_nodes: - self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash)) - datastore_result = node._dataStore.getPeersForBlob(blob_hash) - self.assertEqual(len(datastore_result), 0) - self.assertIn(blob_hash, node._dataStore) # the looping call shouldn't have removed it yet - self.assertEqual(len(node._dataStore.getStoringContacts()), 1) - - self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh) - for node in storing_nodes: - self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash)) - datastore_result = node._dataStore.getPeersForBlob(blob_hash) - self.assertEqual(len(datastore_result), 0) - self.assertEqual(len(node._dataStore.getStoringContacts()), 0) - self.assertNotIn(blob_hash, node._dataStore) # the looping call should have fired