diff --git a/lbrynet/tests/functional/dht/__init__.py b/lbrynet/tests/functional/dht/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lbrynet/tests/functional/dht/dht_test_environment.py b/lbrynet/tests/functional/dht/dht_test_environment.py new file mode 100644 index 000000000..57af2c68a --- /dev/null +++ b/lbrynet/tests/functional/dht/dht_test_environment.py @@ -0,0 +1,174 @@ +import logging +from twisted.trial import unittest +from twisted.internet import defer, task +from lbrynet.dht.node import Node +from mock_transport import resolve, listenUDP, MOCK_DHT_SEED_DNS, mock_node_generator + + +log = logging.getLogger(__name__) + + +class TestKademliaBase(unittest.TestCase): + timeout = 300.0 # timeout for each test + network_size = 16 # including seed nodes + node_ids = None + seed_dns = MOCK_DHT_SEED_DNS + + def _add_next_node(self): + node_id, node_ip = self.mock_node_generator.next() + node = Node(node_id=node_id.decode('hex'), udpPort=4444, peerPort=3333, externalIP=node_ip, + resolve=resolve, listenUDP=listenUDP, callLater=self.clock.callLater, clock=self.clock) + self.nodes.append(node) + return node + + @defer.inlineCallbacks + def add_node(self): + node = self._add_next_node() + yield node.start([(seed_name, 4444) for seed_name in sorted(self.seed_dns.keys())]) + defer.returnValue(node) + + def get_node(self, node_id): + for node in self.nodes: + if node.node_id == node_id: + return node + raise KeyError(node_id) + + @defer.inlineCallbacks + def pop_node(self): + node = self.nodes.pop() + yield node.stop() + + def pump_clock(self, n, step=0.1, tick_callback=None): + """ + :param n: seconds to run the reactor for + :param step: reactor tick rate (in seconds) + """ + for _ in range(int(n * (1.0 / float(step)))): + self.clock.advance(step) + if tick_callback and callable(tick_callback): + tick_callback(self.clock.seconds()) + + def run_reactor(self, seconds, deferreds, tick_callback=None): + d = defer.DeferredList(deferreds) + self.pump_clock(seconds, tick_callback=tick_callback) + return d + + def get_contacts(self): + contacts = {} + for seed in self._seeds: + contacts[seed] = seed.contacts + for node in self._seeds: + contacts[node] = node.contacts + return contacts + + def get_routable_addresses(self): + known = set() + for n in self._seeds: + known.update([(c.id, c.address, c.port) for c in n.contacts]) + for n in self.nodes: + known.update([(c.id, c.address, c.port) for c in n.contacts]) + addresses = {triple[1] for triple in known} + return addresses + + def get_online_addresses(self): + online = set() + for n in self._seeds: + online.add(n.externalIP) + for n in self.nodes: + online.add(n.externalIP) + return online + + def show_info(self): + known = set() + for n in self._seeds: + known.update([(c.id, c.address, c.port) for c in n.contacts]) + for n in self.nodes: + known.update([(c.id, c.address, c.port) for c in n.contacts]) + + log.info("Routable: %i/%i", len(known), len(self.nodes) + len(self._seeds)) + for n in self._seeds: + log.info("seed %s has %i contacts in %i buckets", n.externalIP, len(n.contacts), + len([b for b in n._routingTable._buckets if b.getContacts()])) + for n in self.nodes: + log.info("node %s has %i contacts in %i buckets", n.externalIP, len(n.contacts), + len([b for b in n._routingTable._buckets if b.getContacts()])) + + @defer.inlineCallbacks + def setUp(self): + self.nodes = [] + self._seeds = [] + self.clock = task.Clock() + self.mock_node_generator = mock_node_generator(mock_node_ids=self.node_ids) + + seed_dl = [] + seeds = sorted(list(self.seed_dns.keys())) + known_addresses = [(seed_name, 4444) for seed_name in seeds] + for seed_dns in seeds: + self._add_next_node() + seed = self.nodes.pop() + self._seeds.append(seed) + seed_dl.append( + seed.start(known_addresses) + ) + yield self.run_reactor(901, seed_dl) + while len(self.nodes + self._seeds) < self.network_size: + network_dl = [] + for i in range(min(10, self.network_size - len(self._seeds) - len(self.nodes))): + network_dl.append(self.add_node()) + yield self.run_reactor(31, network_dl) + self.assertEqual(len(self.nodes + self._seeds), self.network_size) + self.pump_clock(1800) + self.verify_all_nodes_are_routable() + self.verify_all_nodes_are_pingable() + + @defer.inlineCallbacks + def tearDown(self): + dl = [] + while self.nodes: + dl.append(self.pop_node()) # stop all of the nodes + while self._seeds: + dl.append(self._seeds.pop().stop()) # and the seeds + yield defer.DeferredList(dl) + + def verify_all_nodes_are_routable(self): + routable = set() + node_addresses = {node.externalIP for node in self.nodes} + node_addresses = node_addresses.union({node.externalIP for node in self._seeds}) + for node in self._seeds: + contact_addresses = {contact.address for contact in node.contacts} + routable.update(contact_addresses) + for node in self.nodes: + contact_addresses = {contact.address for contact in node.contacts} + routable.update(contact_addresses) + self.assertSetEqual(routable, node_addresses) + + @defer.inlineCallbacks + def verify_all_nodes_are_pingable(self): + ping_replies = {} + ping_dl = [] + contacted = set() + + def _ping_cb(result, node, replies): + replies[node] = result + + for node in self._seeds: + contact_addresses = set() + for contact in node.contacts: + contact_addresses.add(contact.address) + d = contact.ping() + d.addCallback(_ping_cb, contact.address, ping_replies) + contacted.add(contact.address) + ping_dl.append(d) + for node in self.nodes: + contact_addresses = set() + for contact in node.contacts: + contact_addresses.add(contact.address) + d = contact.ping() + d.addCallback(_ping_cb, contact.address, ping_replies) + contacted.add(contact.address) + ping_dl.append(d) + yield self.run_reactor(2, ping_dl) + node_addresses = {node.externalIP for node in self.nodes}.union({seed.externalIP for seed in self._seeds}) + self.assertSetEqual(node_addresses, contacted) + expected = {node: "pong" for node in contacted} + self.assertDictEqual(ping_replies, expected) diff --git a/lbrynet/tests/functional/dht/mock_transport.py b/lbrynet/tests/functional/dht/mock_transport.py new file mode 100644 index 000000000..3ce0bae76 --- /dev/null +++ b/lbrynet/tests/functional/dht/mock_transport.py @@ -0,0 +1,149 @@ +import struct +import logging +from twisted.internet import defer, error +from lbrynet.core.utils import generate_id +from lbrynet.dht.encoding import Bencode +from lbrynet.dht.error import DecodeError +from lbrynet.dht.msgformat import DefaultFormat +from lbrynet.dht.msgtypes import ResponseMessage, RequestMessage, ErrorMessage + +_encode = Bencode() +_datagram_formatter = DefaultFormat() + +log = logging.getLogger() + +MOCK_DHT_NODES = [ + "cc8db9d0dd9b65b103594b5f992adf09f18b310958fa451d61ce8d06f3ee97a91461777c2b7dea1a89d02d2f23eb0e4f", + "83a3a398eead3f162fbbe1afb3d63482bb5b6d3cdd8f9b0825c1dfa58dffd3f6f6026d6e64d6d4ae4c3dfe2262e734ba", + "b6928ff25778a7bbb5d258d3b3a06e26db1654f3d2efce8c26681d43f7237cdf2e359a4d309c4473d5d89ec99fb4f573", +] + +MOCK_DHT_SEED_DNS = { # these map to mock nodes 0, 1, and 2 + "lbrynet1.lbry.io": "10.42.42.1", + "lbrynet2.lbry.io": "10.42.42.2", + "lbrynet3.lbry.io": "10.42.42.3", + "lbrynet4.lbry.io": "10.42.42.4", + "lbrynet5.lbry.io": "10.42.42.5", + "lbrynet6.lbry.io": "10.42.42.6", + "lbrynet7.lbry.io": "10.42.42.7", + "lbrynet8.lbry.io": "10.42.42.8", + "lbrynet9.lbry.io": "10.42.42.9", + "lbrynet10.lbry.io": "10.42.42.10", + "lbrynet11.lbry.io": "10.42.42.11", + "lbrynet12.lbry.io": "10.42.42.12", + "lbrynet13.lbry.io": "10.42.42.13", + "lbrynet14.lbry.io": "10.42.42.14", + "lbrynet15.lbry.io": "10.42.42.15", + "lbrynet16.lbry.io": "10.42.42.16", +} + + +def resolve(name, timeout=(1, 3, 11, 45)): + if name not in MOCK_DHT_SEED_DNS: + return defer.fail(error.DNSLookupError(name)) + return defer.succeed(MOCK_DHT_SEED_DNS[name]) + + +class MockUDPTransport(object): + def __init__(self, address, port, max_packet_size, protocol): + self.address = address + self.port = port + self.max_packet_size = max_packet_size + self._node = protocol._node + + def write(self, data, address): + if address in MockNetwork.peers: + dest = MockNetwork.peers[address][0] + debug_kademlia_packet(data, (self.address, self.port), address, self._node) + dest.datagramReceived(data, (self.address, self.port)) + else: # the node is sending to an address that doesnt currently exist, act like it never arrived + pass + + +class MockUDPPort(object): + def __init__(self, protocol, remover): + self.protocol = protocol + self._remover = remover + + def startListening(self, reason=None): + return self.protocol.startProtocol() + + def stopListening(self, reason=None): + result = self.protocol.stopProtocol() + self._remover() + return result + + +class MockNetwork(object): + peers = {} # (interface, port): (protocol, max_packet_size) + + @classmethod + def add_peer(cls, port, protocol, interface, maxPacketSize): + interface = protocol._node.externalIP + protocol.transport = MockUDPTransport(interface, port, maxPacketSize, protocol) + cls.peers[(interface, port)] = (protocol, maxPacketSize) + + def remove_peer(): + del protocol.transport + if (interface, port) in cls.peers: + del cls.peers[(interface, port)] + + return remove_peer + + +def listenUDP(port, protocol, interface='', maxPacketSize=8192): + remover = MockNetwork.add_peer(port, protocol, interface, maxPacketSize) + port = MockUDPPort(protocol, remover) + port.startListening() + return port + + +def address_generator(address=(10, 42, 42, 1)): + def increment(addr): + value = struct.unpack("I", "".join([chr(x) for x in list(addr)[::-1]]))[0] + 1 + new_addr = [] + for i in range(4): + new_addr.append(value % 256) + value >>= 8 + return tuple(new_addr[::-1]) + + while True: + yield "{}.{}.{}.{}".format(*address) + address = increment(address) + + +def mock_node_generator(count=None, mock_node_ids=MOCK_DHT_NODES): + if mock_node_ids is None: + mock_node_ids = MOCK_DHT_NODES + mock_node_ids = list(mock_node_ids) + + for num, node_ip in enumerate(address_generator()): + if count and num >= count: + break + if num >= len(mock_node_ids): + node_id = generate_id().encode('hex') + else: + node_id = mock_node_ids[num] + yield (node_id, node_ip) + + +def debug_kademlia_packet(data, source, destination, node): + if log.level != logging.DEBUG: + return + try: + packet = _datagram_formatter.fromPrimitive(_encode.decode(data)) + if isinstance(packet, RequestMessage): + log.debug("request %s --> %s %s (node time %s)", source[0], destination[0], packet.request, + node.clock.seconds()) + elif isinstance(packet, ResponseMessage): + if isinstance(packet.response, (str, unicode)): + log.debug("response %s <-- %s %s (node time %s)", destination[0], source[0], packet.response, + node.clock.seconds()) + else: + log.debug("response %s <-- %s %i contacts (node time %s)", destination[0], source[0], + len(packet.response), node.clock.seconds()) + elif isinstance(packet, ErrorMessage): + log.error("error %s <-- %s %s (node time %s)", destination[0], source[0], packet.exceptionType, + node.clock.seconds()) + except DecodeError: + log.exception("decode error %s --> %s (node time %s)", source[0], destination[0], node.clock.seconds()) diff --git a/lbrynet/tests/functional/dht/test_bootstrap_network.py b/lbrynet/tests/functional/dht/test_bootstrap_network.py new file mode 100644 index 000000000..e31c87fe0 --- /dev/null +++ b/lbrynet/tests/functional/dht/test_bootstrap_network.py @@ -0,0 +1,10 @@ +from dht_test_environment import TestKademliaBase + + +class TestKademliaBootstrap(TestKademliaBase): + """ + Test initializing the network / connecting the seed nodes + """ + + def test_bootstrap_seed_nodes(self): + pass diff --git a/lbrynet/tests/functional/dht/test_contact_rpc.py b/lbrynet/tests/functional/dht/test_contact_rpc.py new file mode 100644 index 000000000..14641a011 --- /dev/null +++ b/lbrynet/tests/functional/dht/test_contact_rpc.py @@ -0,0 +1,200 @@ +import time +import unittest +import logging +from twisted.internet.task import Clock +from twisted.internet import defer +import lbrynet.dht.protocol +import lbrynet.dht.contact +import lbrynet.dht.constants +import lbrynet.dht.msgtypes +from lbrynet.dht.error import TimeoutError +from lbrynet.dht.node import Node, rpcmethod +from lbrynet.core.call_later_manager import CallLaterManager +from mock_transport import listenUDP, resolve + + +log = logging.getLogger() + + +class KademliaProtocolTest(unittest.TestCase): + """ Test case for the Protocol class """ + + udpPort = 9182 + + def setUp(self): + self._reactor = Clock() + CallLaterManager.setup(self._reactor.callLater) + self.node = Node(node_id='1' * 48, udpPort=self.udpPort, externalIP="127.0.0.1", listenUDP=listenUDP, + resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater) + + def tearDown(self): + CallLaterManager.stop() + del self._reactor + + @defer.inlineCallbacks + def testReactor(self): + """ Tests if the reactor can start/stop the protocol correctly """ + + d = defer.Deferred() + self._reactor.callLater(1, d.callback, True) + self._reactor.advance(1) + result = yield d + self.assertTrue(result) + + def testRPCTimeout(self): + """ Tests if a RPC message sent to a dead remote node times out correctly """ + dead_node = Node(node_id='2' * 48, udpPort=self.udpPort, externalIP="127.0.0.2", listenUDP=listenUDP, + resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater) + dead_node.start_listening() + dead_node.stop() + self._reactor.pump([1 for _ in range(10)]) + dead_contact = self.node.contact_manager.make_contact('2' * 48, '127.0.0.2', 9182, self.node._protocol) + self.node.addContact(dead_contact) + + @rpcmethod + def fake_ping(*args, **kwargs): + time.sleep(lbrynet.dht.constants.rpcTimeout + 1) + return 'pong' + + real_ping = self.node.ping + real_timeout = lbrynet.dht.constants.rpcTimeout + real_attempts = lbrynet.dht.constants.rpcAttempts + lbrynet.dht.constants.rpcAttempts = 1 + lbrynet.dht.constants.rpcTimeout = 1 + + self.node.ping = fake_ping + # Make sure the contact was added + self.failIf(dead_contact not in self.node.contacts, + 'Contact not added to fake node (error in test code)') + self.node.start_listening() + + # Run the PING RPC (which should raise a timeout error) + df = self.node._protocol.sendRPC(dead_contact, 'ping', {}) + + def check_timeout(err): + self.assertEqual(err.type, TimeoutError) + + df.addErrback(check_timeout) + + def reset_values(): + self.node.ping = real_ping + lbrynet.dht.constants.rpcTimeout = real_timeout + lbrynet.dht.constants.rpcAttempts = real_attempts + + # See if the contact was removed due to the timeout + def check_removed_contact(): + self.failIf(dead_contact in self.node.contacts, + 'Contact was not removed after RPC timeout; check exception types.') + + df.addCallback(lambda _: reset_values()) + + # Stop the reactor if a result arrives (timeout or not) + df.addCallback(lambda _: check_removed_contact()) + self._reactor.pump([1 for _ in range(20)]) + + def testRPCRequest(self): + """ Tests if a valid RPC request is executed and responded to correctly """ + + remote_node = Node(node_id='2' * 48, udpPort=self.udpPort, externalIP="127.0.0.2", listenUDP=listenUDP, + resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater) + remote_node.start_listening() + remoteContact = remote_node.contact_manager.make_contact('2' * 48, '127.0.0.2', 9182, self.node._protocol) + self.node.addContact(remoteContact) + + self.error = None + + def handleError(f): + self.error = 'An RPC error occurred: %s' % f.getErrorMessage() + + def handleResult(result): + expectedResult = 'pong' + if result != expectedResult: + self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' \ + % (expectedResult, result) + + # Publish the "local" node on the network + self.node.start_listening() + # Simulate the RPC + df = remoteContact.ping() + df.addCallback(handleResult) + df.addErrback(handleError) + + for _ in range(10): + self._reactor.advance(1) + + self.failIf(self.error, self.error) + # The list of sent RPC messages should be empty at this stage + self.failUnlessEqual(len(self.node._protocol._sentMessages), 0, + 'The protocol is still waiting for a RPC result, ' + 'but the transaction is already done!') + + def testRPCAccess(self): + """ Tests invalid RPC requests + Verifies that a RPC request for an existing but unpublished + method is denied, and that the associated (remote) exception gets + raised locally """ + remote_node = Node(node_id='2' * 48, udpPort=self.udpPort, externalIP="127.0.0.2", listenUDP=listenUDP, + resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater) + remote_node.start_listening() + remote_contact = remote_node.contact_manager.make_contact('2' * 48, '127.0.0.2', 9182, self.node._protocol) + self.node.addContact(remote_contact) + + self.error = None + + def handleError(f): + try: + f.raiseException() + except AttributeError, e: + # This is the expected outcome since the remote node did not publish the method + self.error = None + except Exception, e: + self.error = 'The remote method failed, but the wrong exception was raised; ' \ + 'expected AttributeError, got %s' % type(e) + + def handleResult(result): + self.error = 'The remote method executed successfully, returning: "%s"; ' \ + 'this RPC should not have been allowed.' % result + + self.node.start_listening() + self._reactor.pump([1 for _ in range(10)]) + # Simulate the RPC + df = remote_contact.not_a_rpc_function() + df.addCallback(handleResult) + df.addErrback(handleError) + self._reactor.pump([1 for _ in range(10)]) + self.failIf(self.error, self.error) + # The list of sent RPC messages should be empty at this stage + self.failUnlessEqual(len(self.node._protocol._sentMessages), 0, + 'The protocol is still waiting for a RPC result, ' + 'but the transaction is already done!') + + def testRPCRequestArgs(self): + """ Tests if an RPC requiring arguments is executed correctly """ + remote_node = Node(node_id='2' * 48, udpPort=self.udpPort, externalIP="127.0.0.2", listenUDP=listenUDP, + resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater) + remote_node.start_listening() + remote_contact = remote_node.contact_manager.make_contact('2' * 48, '127.0.0.2', 9182, self.node._protocol) + self.node.addContact(remote_contact) + self.error = None + + def handleError(f): + self.error = 'An RPC error occurred: %s' % f.getErrorMessage() + + def handleResult(result): + expectedResult = 'pong' + if result != expectedResult: + self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' % \ + (expectedResult, result) + + # Publish the "local" node on the network + self.node.start_listening() + # Simulate the RPC + df = remote_contact.ping() + df.addCallback(handleResult) + df.addErrback(handleError) + self._reactor.pump([1 for _ in range(10)]) + self.failIf(self.error, self.error) + # The list of sent RPC messages should be empty at this stage + self.failUnlessEqual(len(self.node._protocol._sentMessages), 0, + 'The protocol is still waiting for a RPC result, ' + 'but the transaction is already done!') diff --git a/lbrynet/tests/functional/test_dht.py b/lbrynet/tests/functional/test_dht.py deleted file mode 100644 index 692185880..000000000 --- a/lbrynet/tests/functional/test_dht.py +++ /dev/null @@ -1,274 +0,0 @@ -import time -import logging -from twisted.trial import unittest -from twisted.internet import defer, threads, task -from lbrynet.dht.node import Node -from lbrynet.tests import mocks -from lbrynet.core.utils import generate_id - -log = logging.getLogger("lbrynet.tests.util") -# log.addHandler(logging.StreamHandler()) -# log.setLevel(logging.DEBUG) - - -class TestKademliaBase(unittest.TestCase): - timeout = 300.0 # timeout for each test - network_size = 0 # plus lbrynet1, lbrynet2, and lbrynet3 seed nodes - node_ids = None - seed_dns = mocks.MOCK_DHT_SEED_DNS - - def _add_next_node(self): - node_id, node_ip = self.mock_node_generator.next() - node = Node(node_id=node_id.decode('hex'), udpPort=4444, peerPort=3333, externalIP=node_ip, - resolve=mocks.resolve, listenUDP=mocks.listenUDP, callLater=self.clock.callLater, clock=self.clock) - self.nodes.append(node) - return node - - @defer.inlineCallbacks - def add_node(self): - node = self._add_next_node() - yield node.joinNetwork( - [ - ("lbrynet1.lbry.io", self._seeds[0].port), - ("lbrynet2.lbry.io", self._seeds[1].port), - ("lbrynet3.lbry.io", self._seeds[2].port), - ] - ) - defer.returnValue(node) - - def get_node(self, node_id): - for node in self.nodes: - if node.node_id == node_id: - return node - raise KeyError(node_id) - - @defer.inlineCallbacks - def pop_node(self): - node = self.nodes.pop() - yield node.stop() - - def pump_clock(self, n, step=0.01): - """ - :param n: seconds to run the reactor for - :param step: reactor tick rate (in seconds) - """ - for _ in range(n * 100): - self.clock.advance(step) - - def run_reactor(self, seconds, *deferreds): - dl = [threads.deferToThread(self.pump_clock, seconds)] - for d in deferreds: - dl.append(d) - return defer.DeferredList(dl) - - @defer.inlineCallbacks - def setUp(self): - self.nodes = [] - self._seeds = [] - self.clock = task.Clock() - self.mock_node_generator = mocks.mock_node_generator(mock_node_ids=self.node_ids) - - join_dl = [] - for seed_dns in self.seed_dns: - other_seeds = list(self.seed_dns.keys()) - other_seeds.remove(seed_dns) - - self._add_next_node() - seed = self.nodes.pop() - self._seeds.append(seed) - join_dl.append( - seed.joinNetwork([(other_seed_dns, 4444) for other_seed_dns in other_seeds]) - ) - - if self.network_size: - for _ in range(self.network_size): - join_dl.append(self.add_node()) - yield self.run_reactor(1, *tuple(join_dl)) - self.verify_all_nodes_are_routable() - - @defer.inlineCallbacks - def tearDown(self): - dl = [] - while self.nodes: - dl.append(self.pop_node()) # stop all of the nodes - while self._seeds: - dl.append(self._seeds.pop().stop()) # and the seeds - yield defer.DeferredList(dl) - - def verify_all_nodes_are_routable(self): - routable = set() - node_addresses = {node.externalIP for node in self.nodes} - node_addresses = node_addresses.union({node.externalIP for node in self._seeds}) - for node in self._seeds: - contact_addresses = {contact.address for contact in node.contacts} - routable.update(contact_addresses) - for node in self.nodes: - contact_addresses = {contact.address for contact in node.contacts} - routable.update(contact_addresses) - self.assertSetEqual(routable, node_addresses) - - @defer.inlineCallbacks - def verify_all_nodes_are_pingable(self): - ping_replies = {} - ping_dl = [] - contacted = set() - - def _ping_cb(result, node, replies): - replies[node] = result - - for node in self._seeds: - contact_addresses = set() - for contact in node.contacts: - contact_addresses.add(contact.address) - d = contact.ping() - d.addCallback(_ping_cb, contact.address, ping_replies) - contacted.add(contact.address) - ping_dl.append(d) - for node in self.nodes: - contact_addresses = set() - for contact in node.contacts: - contact_addresses.add(contact.address) - d = contact.ping() - d.addCallback(_ping_cb, contact.address, ping_replies) - contacted.add(contact.address) - ping_dl.append(d) - self.run_reactor(2, *ping_dl) - yield threads.deferToThread(time.sleep, 0.1) - node_addresses = {node.externalIP for node in self.nodes}.union({seed.externalIP for seed in self._seeds}) - self.assertSetEqual(node_addresses, contacted) - self.assertDictEqual(ping_replies, {node: "pong" for node in contacted}) - - -class TestKademliaBootstrap(TestKademliaBase): - """ - Test initializing the network / connecting the seed nodes - """ - - def test_bootstrap_network(self): # simulates the real network, which has three seeds - self.assertEqual(len(self._seeds[0].contacts), 2) - self.assertEqual(len(self._seeds[1].contacts), 2) - self.assertEqual(len(self._seeds[2].contacts), 2) - - self.assertSetEqual( - {self._seeds[0].contacts[0].address, self._seeds[0].contacts[1].address}, - {self._seeds[1].externalIP, self._seeds[2].externalIP} - ) - - self.assertSetEqual( - {self._seeds[1].contacts[0].address, self._seeds[1].contacts[1].address}, - {self._seeds[0].externalIP, self._seeds[2].externalIP} - ) - - self.assertSetEqual( - {self._seeds[2].contacts[0].address, self._seeds[2].contacts[1].address}, - {self._seeds[0].externalIP, self._seeds[1].externalIP} - ) - - def test_all_nodes_are_pingable(self): - return self.verify_all_nodes_are_pingable() - - -class TestKademliaBootstrapSixteenSeeds(TestKademliaBase): - node_ids = [ - '000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000', - '111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111', - '222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222', - '333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333', - '444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444', - '555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555', - '666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666', - '777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777', - '888888888888888888888888888888888888888888888888888888888888888888888888888888888888888888888888', - '999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999', - 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', - 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb', - 'cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc', - 'dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd', - 'eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee', - 'ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff' - ] - - @defer.inlineCallbacks - def setUp(self): - self.seed_dns.update( - { - "lbrynet4.lbry.io": "10.42.42.4", - "lbrynet5.lbry.io": "10.42.42.5", - "lbrynet6.lbry.io": "10.42.42.6", - "lbrynet7.lbry.io": "10.42.42.7", - "lbrynet8.lbry.io": "10.42.42.8", - "lbrynet9.lbry.io": "10.42.42.9", - "lbrynet10.lbry.io": "10.42.42.10", - "lbrynet11.lbry.io": "10.42.42.11", - "lbrynet12.lbry.io": "10.42.42.12", - "lbrynet13.lbry.io": "10.42.42.13", - "lbrynet14.lbry.io": "10.42.42.14", - "lbrynet15.lbry.io": "10.42.42.15", - "lbrynet16.lbry.io": "10.42.42.16", - } - ) - yield TestKademliaBase.setUp(self) - - @defer.inlineCallbacks - def tearDown(self): - yield TestKademliaBase.tearDown(self) - - def test_bootstrap_network(self): - pass - - def _test_all_nodes_are_pingable(self): - return self.verify_all_nodes_are_pingable() - - -class Test250NodeNetwork(TestKademliaBase): - network_size = 250 - - def test_setup_network_and_verify_connectivity(self): - pass - - def update_network(self): - import random - dl = [] - announced_blobs = [] - - for node in self.nodes: # random events - if random.randint(0, 10000) < 75 and announced_blobs: # get peers for a blob - log.info('find blob') - blob_hash = random.choice(announced_blobs) - dl.append(node.getPeersForBlob(blob_hash)) - if random.randint(0, 10000) < 25: # announce a blob - log.info('announce blob') - blob_hash = generate_id() - announced_blobs.append((blob_hash, node.node_id)) - dl.append(node.announceHaveBlob(blob_hash)) - - random.shuffle(self.nodes) - - # kill nodes - while random.randint(0, 100) > 95: - dl.append(self.pop_node()) - log.info('pop node') - - # add nodes - while random.randint(0, 100) > 95: - dl.append(self.add_node()) - log.info('add node') - return tuple(dl), announced_blobs - - @defer.inlineCallbacks - def _test_simulate_network(self): - total_blobs = [] - for i in range(100): - d, blobs = self.update_network() - total_blobs.extend(blobs) - self.run_reactor(1, *d) - yield threads.deferToThread(time.sleep, 0.1) - routable = set() - node_addresses = {node.externalIP for node in self.nodes} - for node in self.nodes: - contact_addresses = {contact.address for contact in node.contacts} - routable.update(contact_addresses) - log.warning("difference: %i", len(node_addresses.difference(routable))) - log.info("blobs %i", len(total_blobs)) - log.info("step %i, %i nodes", i, len(self.nodes)) - self.pump_clock(100) diff --git a/lbrynet/tests/mocks.py b/lbrynet/tests/mocks.py index 12770c188..5074e0531 100644 --- a/lbrynet/tests/mocks.py +++ b/lbrynet/tests/mocks.py @@ -1,21 +1,18 @@ import base64 -import struct import io from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives import serialization -from twisted.internet import defer, error +from twisted.internet import defer from twisted.python.failure import Failure from lbrynet.core.client.ClientRequest import ClientRequest from lbrynet.core.Error import RequestCanceledError from lbrynet.core import BlobAvailability -from lbrynet.core.utils import generate_id from lbrynet.dht.node import Node as RealNode from lbrynet.daemon import ExchangeRateManager as ERM from lbrynet import conf -from util import debug_kademlia_packet KB = 2**10 PUBLIC_EXPONENT = 65537 # http://www.daemonology.net/blog/2009-06-11-cryptographic-right-answers.html @@ -41,6 +38,9 @@ class Node(RealNode): def stop(self): return defer.succeed(None) + def start(self, known_node_addresses=None): + return self.joinNetwork(known_node_addresses) + class FakeNetwork(object): @staticmethod @@ -188,9 +188,15 @@ class Wallet(object): def get_info_exchanger(self): return PointTraderKeyExchanger(self) + def update_peer_address(self, peer, address): + pass + def get_wallet_info_query_handler_factory(self): return PointTraderKeyQueryHandlerFactory(self) + def get_unused_address_for_peer(self, peer): + return defer.succeed("bDtL6qriyimxz71DSYjojTBsm6cpM1bqmj") + def reserve_points(self, *args): return True @@ -250,18 +256,12 @@ class Announcer(object): def immediate_announce(self, *args): pass - def run_manage_loop(self): - pass - def start(self): pass def stop(self): pass - def get_next_announce_time(self): - return 0 - class GenFile(io.RawIOBase): def __init__(self, size, pattern): @@ -410,89 +410,3 @@ def mock_conf_settings(obj, settings={}): conf.settings = original_settings obj.addCleanup(_reset_settings) - - -MOCK_DHT_NODES = [ - "000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", - "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", - "DEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF", -] - -MOCK_DHT_SEED_DNS = { # these map to mock nodes 0, 1, and 2 - "lbrynet1.lbry.io": "10.42.42.1", - "lbrynet2.lbry.io": "10.42.42.2", - "lbrynet3.lbry.io": "10.42.42.3", -} - - -def resolve(name, timeout=(1, 3, 11, 45)): - if name not in MOCK_DHT_SEED_DNS: - return defer.fail(error.DNSLookupError(name)) - return defer.succeed(MOCK_DHT_SEED_DNS[name]) - - -class MockUDPTransport(object): - def __init__(self, address, port, max_packet_size, protocol): - self.address = address - self.port = port - self.max_packet_size = max_packet_size - self._node = protocol._node - - def write(self, data, address): - dest = MockNetwork.peers[address][0] - debug_kademlia_packet(data, (self.address, self.port), address, self._node) - dest.datagramReceived(data, (self.address, self.port)) - - -class MockUDPPort(object): - def __init__(self, protocol): - self.protocol = protocol - - def startListening(self, reason=None): - return self.protocol.startProtocol() - - def stopListening(self, reason=None): - return self.protocol.stopProtocol() - - -class MockNetwork(object): - peers = {} # (interface, port): (protocol, max_packet_size) - - @classmethod - def add_peer(cls, port, protocol, interface, maxPacketSize): - interface = protocol._node.externalIP - protocol.transport = MockUDPTransport(interface, port, maxPacketSize, protocol) - cls.peers[(interface, port)] = (protocol, maxPacketSize) - - -def listenUDP(port, protocol, interface='', maxPacketSize=8192): - MockNetwork.add_peer(port, protocol, interface, maxPacketSize) - return MockUDPPort(protocol) - - -def address_generator(address=(10, 42, 42, 1)): - def increment(addr): - value = struct.unpack("I", "".join([chr(x) for x in list(addr)[::-1]]))[0] + 1 - new_addr = [] - for i in range(4): - new_addr.append(value % 256) - value >>= 8 - return tuple(new_addr[::-1]) - - while True: - yield "{}.{}.{}.{}".format(*address) - address = increment(address) - - -def mock_node_generator(count=None, mock_node_ids=MOCK_DHT_NODES): - if mock_node_ids is None: - mock_node_ids = MOCK_DHT_NODES - - for num, node_ip in enumerate(address_generator()): - if count and num >= count: - break - if num >= len(mock_node_ids): - node_id = generate_id().encode('hex') - else: - node_id = mock_node_ids[num] - yield (node_id, node_ip) diff --git a/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py b/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py index 2f67d5567..60021ffc9 100644 --- a/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py +++ b/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py @@ -1,82 +1,55 @@ -import tempfile -import shutil from twisted.trial import unittest -from twisted.internet import defer, reactor, threads +from twisted.internet import defer, task +from lbrynet.core import utils from lbrynet.tests.util import random_lbry_hash -from lbrynet.dht.hashannouncer import DHTHashAnnouncer -from lbrynet.core.call_later_manager import CallLaterManager -from lbrynet.database.storage import SQLiteStorage - class MocDHTNode(object): - def __init__(self, announce_will_fail=False): - # if announce_will_fail is True, - # announceHaveBlob will return empty dict - self.call_later_manager = CallLaterManager - self.call_later_manager.setup(reactor.callLater) + def __init__(self): self.blobs_announced = 0 - self.announce_will_fail = announce_will_fail def announceHaveBlob(self, blob): - if self.announce_will_fail: - return_val = {} - else: - return_val = {blob: ["ab"*48]} - self.blobs_announced += 1 - d = defer.Deferred() - self.call_later_manager.call_later(1, d.callback, return_val) - return d + return defer.succeed(True) +class MocSupplier(object): + def __init__(self, blobs_to_announce): + self.blobs_to_announce = blobs_to_announce + self.announced = False + def hashes_to_announce(self): + if not self.announced: + self.announced = True + return defer.succeed(self.blobs_to_announce) + else: + return defer.succeed([]) class DHTHashAnnouncerTest(unittest.TestCase): - @defer.inlineCallbacks + def setUp(self): - from lbrynet.conf import initialize_settings - initialize_settings(False) self.num_blobs = 10 self.blobs_to_announce = [] for i in range(0, self.num_blobs): self.blobs_to_announce.append(random_lbry_hash()) + self.clock = task.Clock() self.dht_node = MocDHTNode() - self.dht_node.peerPort = 3333 - self.dht_node.clock = reactor - self.db_dir = tempfile.mkdtemp() - self.storage = SQLiteStorage(self.db_dir) - yield self.storage.setup() - self.announcer = DHTHashAnnouncer(self.dht_node, self.storage, 10) - for blob_hash in self.blobs_to_announce: - yield self.storage.add_completed_blob(blob_hash, 100, 0, 1) + utils.call_later = self.clock.callLater + from lbrynet.core.server.DHTHashAnnouncer import DHTHashAnnouncer + self.announcer = DHTHashAnnouncer(self.dht_node, peer_port=3333) + self.supplier = MocSupplier(self.blobs_to_announce) + self.announcer.add_supplier(self.supplier) - @defer.inlineCallbacks - def tearDown(self): - self.dht_node.call_later_manager.stop() - yield self.storage.stop() - yield threads.deferToThread(shutil.rmtree, self.db_dir) - - @defer.inlineCallbacks - def test_announce_fail(self): - # test what happens when node.announceHaveBlob() returns empty dict - self.dht_node.announce_will_fail = True - d = yield self.announcer.manage() - yield d - - @defer.inlineCallbacks def test_basic(self): - d = self.announcer.immediate_announce(self.blobs_to_announce) - self.assertEqual(len(self.announcer.hash_queue), self.num_blobs) - yield d + self.announcer._announce_available_hashes() + self.assertEqual(self.announcer.hash_queue_size(), self.announcer.CONCURRENT_ANNOUNCERS) + self.clock.advance(1) self.assertEqual(self.dht_node.blobs_announced, self.num_blobs) - self.assertEqual(len(self.announcer.hash_queue), 0) + self.assertEqual(self.announcer.hash_queue_size(), 0) - @defer.inlineCallbacks def test_immediate_announce(self): # Test that immediate announce puts a hash at the front of the queue - d = self.announcer.immediate_announce(self.blobs_to_announce) - self.assertEqual(len(self.announcer.hash_queue), self.num_blobs) + self.announcer._announce_available_hashes() blob_hash = random_lbry_hash() self.announcer.immediate_announce([blob_hash]) - self.assertEqual(len(self.announcer.hash_queue), self.num_blobs+1) - self.assertEqual(blob_hash, self.announcer.hash_queue[-1]) - yield d + self.assertEqual(self.announcer.hash_queue_size(), self.announcer.CONCURRENT_ANNOUNCERS+1) + self.assertEqual(blob_hash, self.announcer.hash_queue[0][0]) + diff --git a/lbrynet/tests/unit/dht/test_contact.py b/lbrynet/tests/unit/dht/test_contact.py index bcd34c8f9..b150e2fbf 100644 --- a/lbrynet/tests/unit/dht/test_contact.py +++ b/lbrynet/tests/unit/dht/test_contact.py @@ -1,16 +1,24 @@ -import unittest - -from lbrynet.dht import contact +from twisted.internet import task +from twisted.trial import unittest +from lbrynet.core.utils import generate_id +from lbrynet.dht.contact import ContactManager +from lbrynet.dht import constants class ContactOperatorsTest(unittest.TestCase): """ Basic tests case for boolean operators on the Contact class """ def setUp(self): - self.firstContact = contact.Contact('firstContactID', '127.0.0.1', 1000, None, 1) - self.secondContact = contact.Contact('2ndContactID', '192.168.0.1', 1000, None, 32) - self.secondContactCopy = contact.Contact('2ndContactID', '192.168.0.1', 1000, None, 32) - self.firstContactDifferentValues = contact.Contact( - 'firstContactID', '192.168.1.20', 1000, None, 50) + self.contact_manager = ContactManager() + self.node_ids = [generate_id(), generate_id(), generate_id()] + self.firstContact = self.contact_manager.make_contact(self.node_ids[1], '127.0.0.1', 1000, None, 1) + self.secondContact = self.contact_manager.make_contact(self.node_ids[0], '192.168.0.1', 1000, None, 32) + self.secondContactCopy = self.contact_manager.make_contact(self.node_ids[0], '192.168.0.1', 1000, None, 32) + self.firstContactDifferentValues = self.contact_manager.make_contact(self.node_ids[1], '192.168.1.20', + 1000, None, 50) + + def testNoDuplicateContactObjects(self): + self.assertTrue(self.secondContact is self.secondContactCopy) + self.assertTrue(self.firstContact is not self.firstContactDifferentValues) def testBoolean(self): """ Test "equals" and "not equals" comparisons """ @@ -24,15 +32,6 @@ class ContactOperatorsTest(unittest.TestCase): self.secondContact, self.secondContactCopy, 'Different copies of the same Contact instance should be equal') - def testStringComparisons(self): - """ Test comparisons of Contact objects with str types """ - self.failUnlessEqual( - 'firstContactID', self.firstContact, - 'The node ID string must be equal to the contact object') - self.failIfEqual( - 'some random string', self.firstContact, - "The tested string should not be equal to the contact object (not equal to it's ID)") - def testIllogicalComparisons(self): """ Test comparisons with non-Contact and non-str types """ msg = '"{}" operator: Contact object should not be equal to {} type' diff --git a/lbrynet/tests/unit/dht/test_datastore.py b/lbrynet/tests/unit/dht/test_datastore.py index 9d50e4070..a431f4aac 100644 --- a/lbrynet/tests/unit/dht/test_datastore.py +++ b/lbrynet/tests/unit/dht/test_datastore.py @@ -4,19 +4,19 @@ # the GNU Lesser General Public License Version 3, or any later version. # See the COPYING file included in this archive -import unittest +from twisted.trial import unittest import time - -import lbrynet.dht.datastore -import lbrynet.dht.constants - import hashlib +from lbrynet.dht.datastore import DictDataStore +from lbrynet.dht import constants + + class DictDataStoreTest(unittest.TestCase): """ Basic tests case for the reference DataStore API and implementation """ def setUp(self): - self.ds = lbrynet.dht.datastore.DictDataStore() - h = hashlib.sha1() + self.ds = DictDataStore() + h = hashlib.sha384() h.update('g') hashKey = h.digest() h2 = hashlib.sha1() @@ -78,7 +78,7 @@ class DictDataStoreTest(unittest.TestCase): h2 = hashlib.sha1() h2.update('test2') key2 = h2.digest() - td = lbrynet.dht.constants.dataExpireTimeout - 100 + td = constants.dataExpireTimeout - 100 td2 = td + td self.ds.addPeerToBlob(h1, 'val1', now - td, now - td, '1') self.ds.addPeerToBlob(h1, 'val2', now - td2, now - td2, '2') @@ -128,16 +128,3 @@ class DictDataStoreTest(unittest.TestCase): # # # Read back the meta-data # for key, value in self.cases: - - - - -def suite(): - suite = unittest.TestSuite() - suite.addTest(unittest.makeSuite(DictDataStoreTest)) - return suite - - -if __name__ == '__main__': - # If this module is executed from the commandline, run all its tests - unittest.TextTestRunner().run(suite()) diff --git a/lbrynet/tests/unit/dht/test_encoding.py b/lbrynet/tests/unit/dht/test_encoding.py index 159b401da..042a664f3 100644 --- a/lbrynet/tests/unit/dht/test_encoding.py +++ b/lbrynet/tests/unit/dht/test_encoding.py @@ -4,10 +4,10 @@ # the GNU Lesser General Public License Version 3, or any later version. # See the COPYING file included in this archive -import unittest - +from twisted.trial import unittest import lbrynet.dht.encoding + class BencodeTest(unittest.TestCase): """ Basic tests case for the Bencode implementation """ def setUp(self): @@ -16,7 +16,7 @@ class BencodeTest(unittest.TestCase): self.cases = ((42, 'i42e'), ('spam', '4:spam'), (['spam', 42], 'l4:spami42ee'), - ({'foo':42, 'bar':'spam'}, 'd3:bar4:spam3:fooi42ee'), + ({'foo': 42, 'bar': 'spam'}, 'd3:bar4:spam3:fooi42ee'), # ...and now the "real life" tests ([['abc', '127.0.0.1', 1919], ['def', '127.0.0.1', 1921]], 'll3:abc9:127.0.0.1i1919eel3:def9:127.0.0.1i1921eee')) @@ -45,12 +45,3 @@ class BencodeTest(unittest.TestCase): for encodedValue in self.badDecoderCases: self.failUnlessRaises( lbrynet.dht.encoding.DecodeError, self.encoding.decode, encodedValue) - -def suite(): - suite = unittest.TestSuite() - suite.addTest(unittest.makeSuite(BencodeTest)) - return suite - -if __name__ == '__main__': - # If this module is executed from the commandline, run all its tests - unittest.TextTestRunner().run(suite()) diff --git a/lbrynet/tests/unit/dht/test_kbucket.py b/lbrynet/tests/unit/dht/test_kbucket.py index ebfbb5478..2896076b8 100644 --- a/lbrynet/tests/unit/dht/test_kbucket.py +++ b/lbrynet/tests/unit/dht/test_kbucket.py @@ -4,23 +4,41 @@ # the GNU Lesser General Public License Version 3, or any later version. # See the COPYING file included in this archive -import unittest - +from twisted.trial import unittest +import struct +from lbrynet.core.utils import generate_id from lbrynet.dht import kbucket -import lbrynet.dht.contact as contact +from lbrynet.dht.contact import ContactManager from lbrynet.dht import constants + +def address_generator(address=(10, 42, 42, 1)): + def increment(addr): + value = struct.unpack("I", "".join([chr(x) for x in list(addr)[::-1]]))[0] + 1 + new_addr = [] + for i in range(4): + new_addr.append(value % 256) + value >>= 8 + return tuple(new_addr[::-1]) + + while True: + yield "{}.{}.{}.{}".format(*address) + address = increment(address) + + class KBucketTest(unittest.TestCase): """ Test case for the KBucket class """ def setUp(self): - self.kbucket = kbucket.KBucket(0, 2**160) + self.address_generator = address_generator() + self.contact_manager = ContactManager() + self.kbucket = kbucket.KBucket(0, 2**constants.key_bits, generate_id()) def testAddContact(self): """ Tests if the bucket handles contact additions/updates correctly """ # Test if contacts can be added to empty list # Add k contacts to bucket for i in range(constants.k): - tmpContact = contact.Contact('tempContactID%d' % i, str(i), i, i) + tmpContact = self.contact_manager.make_contact(generate_id(), next(self.address_generator), 4444, 0, None) self.kbucket.addContact(tmpContact) self.failUnlessEqual( self.kbucket._contacts[i], @@ -28,8 +46,7 @@ class KBucketTest(unittest.TestCase): "Contact in position %d not the same as the newly-added contact" % i) # Test if contact is not added to full list - i += 1 - tmpContact = contact.Contact('tempContactID%d' % i, str(i), i, i) + tmpContact = self.contact_manager.make_contact(generate_id(), next(self.address_generator), 4444, 0, None) self.failUnlessRaises(kbucket.BucketFull, self.kbucket.addContact, tmpContact) # Test if an existing contact is updated correctly if added again @@ -48,14 +65,17 @@ class KBucketTest(unittest.TestCase): # Add k-2 contacts + node_ids = [] if constants.k >= 2: for i in range(constants.k-2): - tmpContact = contact.Contact(i, i, i, i) + node_ids.append(generate_id()) + tmpContact = self.contact_manager.make_contact(node_ids[-1], next(self.address_generator), 4444, 0, None) self.kbucket.addContact(tmpContact) else: # add k contacts for i in range(constants.k): - tmpContact = contact.Contact(i, i, i, i) + node_ids.append(generate_id()) + tmpContact = self.contact_manager.make_contact(node_ids[-1], next(self.address_generator), 4444, 0, None) self.kbucket.addContact(tmpContact) # try to get too many contacts @@ -65,8 +85,8 @@ class KBucketTest(unittest.TestCase): 'Returned list should not have more than k entries!') # verify returned contacts in list - for i in range(constants.k-2): - self.failIf(self.kbucket._contacts[i].id != i, + for node_id, i in zip(node_ids, range(constants.k-2)): + self.failIf(self.kbucket._contacts[i].id != node_id, "Contact in position %s not same as added contact" % (str(i))) # try to get too many contacts @@ -89,25 +109,15 @@ class KBucketTest(unittest.TestCase): def testRemoveContact(self): # try remove contact from empty list - rmContact = contact.Contact('TestContactID1', '127.0.0.1', 1, 1) + rmContact = self.contact_manager.make_contact(generate_id(), next(self.address_generator), 4444, 0, None) self.failUnlessRaises(ValueError, self.kbucket.removeContact, rmContact) # Add couple contacts for i in range(constants.k-2): - tmpContact = contact.Contact('tmpTestContactID%d' % i, str(i), i, i) + tmpContact = self.contact_manager.make_contact(generate_id(), next(self.address_generator), 4444, 0, None) self.kbucket.addContact(tmpContact) # try remove contact from empty list self.kbucket.addContact(rmContact) result = self.kbucket.removeContact(rmContact) self.failIf(rmContact in self.kbucket._contacts, "Could not remove contact from bucket") - - -def suite(): - suite = unittest.TestSuite() - suite.addTest(unittest.makeSuite(KBucketTest)) - return suite - -if __name__ == '__main__': - # If this module is executed from the commandline, run all its tests - unittest.TextTestRunner().run(suite()) diff --git a/lbrynet/tests/unit/dht/test_messages.py b/lbrynet/tests/unit/dht/test_messages.py index 36c2295b9..6319901c6 100644 --- a/lbrynet/tests/unit/dht/test_messages.py +++ b/lbrynet/tests/unit/dht/test_messages.py @@ -4,7 +4,7 @@ # the GNU Lesser General Public License Version 3, or any later version. # See the COPYING file included in this archive -import unittest +from twisted.trial import unittest from lbrynet.dht.msgtypes import RequestMessage, ResponseMessage, ErrorMessage from lbrynet.dht.msgformat import MessageTranslator, DefaultFormat diff --git a/lbrynet/tests/unit/dht/test_node.py b/lbrynet/tests/unit/dht/test_node.py index ab73ba3e8..c612d7593 100644 --- a/lbrynet/tests/unit/dht/test_node.py +++ b/lbrynet/tests/unit/dht/test_node.py @@ -5,20 +5,21 @@ # See the COPYING file included in this archive import hashlib -import unittest +from twisted.trial import unittest import struct from twisted.internet import protocol, defer, selectreactor from lbrynet.dht.msgtypes import ResponseMessage -import lbrynet.dht.node -import lbrynet.dht.constants -import lbrynet.dht.datastore +from lbrynet.dht.node import Node +from lbrynet.dht import constants +from lbrynet.dht.datastore import DictDataStore +from lbrynet.dht.routingtable import TreeRoutingTable class NodeIDTest(unittest.TestCase): """ Test case for the Node class's ID """ def setUp(self): - self.node = lbrynet.dht.node.Node() + self.node = Node() def testAutoCreatedID(self): """ Tests if a new node has a valid node ID """ @@ -49,12 +50,10 @@ class NodeIDTest(unittest.TestCase): class NodeDataTest(unittest.TestCase): """ Test case for the Node class's data-related functions """ def setUp(self): - import lbrynet.dht.contact h = hashlib.sha384() h.update('test') - self.node = lbrynet.dht.node.Node() - self.contact = lbrynet.dht.contact.Contact(h.digest(), '127.0.0.1', 12345, - self.node._protocol) + self.node = Node() + self.contact = self.node.contact_manager.make_contact(h.digest(), '127.0.0.1', 12345, self.node._protocol) self.token = self.node.make_token(self.contact.compact_ip()) self.cases = [] for i in xrange(5): @@ -65,13 +64,8 @@ class NodeDataTest(unittest.TestCase): @defer.inlineCallbacks def testStore(self): """ Tests if the node can store (and privately retrieve) some data """ - for key, value in self.cases: - request = { - 'port': value, - 'lbryid': self.contact.id, - 'token': self.token - } - yield self.node.store(key, request, self.contact.id, _rpcNodeContact=self.contact) + for key, port in self.cases: + yield self.node.store(self.contact, key, self.token, port, self.contact.id) for key, value in self.cases: expected_result = self.contact.compact_ip() + str(struct.pack('>H', value)) + \ self.contact.id @@ -85,189 +79,185 @@ class NodeDataTest(unittest.TestCase): class NodeContactTest(unittest.TestCase): """ Test case for the Node class's contact management-related functions """ def setUp(self): - self.node = lbrynet.dht.node.Node() + self.node = Node() + @defer.inlineCallbacks def testAddContact(self): """ Tests if a contact can be added and retrieved correctly """ - import lbrynet.dht.contact # Create the contact h = hashlib.sha384() h.update('node1') contactID = h.digest() - contact = lbrynet.dht.contact.Contact(contactID, '127.0.0.1', 91824, self.node._protocol) + contact = self.node.contact_manager.make_contact(contactID, '127.0.0.1', 91824, self.node._protocol) # Now add it... - self.node.addContact(contact) + yield self.node.addContact(contact) # ...and request the closest nodes to it using FIND_NODE - closestNodes = self.node._routingTable.findCloseNodes(contactID, lbrynet.dht.constants.k) + closestNodes = self.node._routingTable.findCloseNodes(contactID, constants.k) self.failUnlessEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; ' 'expected 1, got %d' % len(closestNodes)) self.failUnless(contact in closestNodes, 'Added contact not found by issueing ' '_findCloseNodes()') + @defer.inlineCallbacks def testAddSelfAsContact(self): """ Tests the node's behaviour when attempting to add itself as a contact """ - import lbrynet.dht.contact # Create a contact with the same ID as the local node's ID - contact = lbrynet.dht.contact.Contact(self.node.node_id, '127.0.0.1', 91824, None) + contact = self.node.contact_manager.make_contact(self.node.node_id, '127.0.0.1', 91824, None) # Now try to add it - self.node.addContact(contact) + yield self.node.addContact(contact) # ...and request the closest nodes to it using FIND_NODE closestNodes = self.node._routingTable.findCloseNodes(self.node.node_id, - lbrynet.dht.constants.k) + constants.k) self.failIf(contact in closestNodes, 'Node added itself as a contact') -class FakeRPCProtocol(protocol.DatagramProtocol): - def __init__(self): - self.reactor = selectreactor.SelectReactor() - self.testResponse = None - self.network = None - - def createNetwork(self, contactNetwork): - """ - set up a list of contacts together with their closest contacts - @param contactNetwork: a sequence of tuples, each containing a contact together with its - closest contacts: C{(, )} - """ - self.network = contactNetwork - - def sendRPC(self, contact, method, args, rawResponse=False): - """ Fake RPC protocol; allows entangled.kademlia.contact.Contact objects to "send" RPCs""" - - h = hashlib.sha384() - h.update('rpcId') - rpc_id = h.digest()[:20] - - if method == "findNode": - # get the specific contacts closest contacts - closestContacts = [] - closestContactsList = [] - for contactTuple in self.network: - if contact == contactTuple[0]: - # get the list of closest contacts for this contact - closestContactsList = contactTuple[1] - # Pack the closest contacts into a ResponseMessage - for closeContact in closestContactsList: - closestContacts.append((closeContact.id, closeContact.address, closeContact.port)) - - message = ResponseMessage(rpc_id, contact.id, closestContacts) - df = defer.Deferred() - df.callback((message, (contact.address, contact.port))) - return df - elif method == "findValue": - for contactTuple in self.network: - if contact == contactTuple[0]: - # Get the data stored by this remote contact - dataDict = contactTuple[2] - dataKey = dataDict.keys()[0] - data = dataDict.get(dataKey) - # Check if this contact has the requested value - if dataKey == args[0]: - # Return the data value - response = dataDict - print "data found at contact: " + contact.id - else: - # Return the closest contact to the requested data key - print "data not found at contact: " + contact.id - closeContacts = contactTuple[1] - closestContacts = [] - for closeContact in closeContacts: - closestContacts.append((closeContact.id, closeContact.address, - closeContact.port)) - response = closestContacts - - # Create the response message - message = ResponseMessage(rpc_id, contact.id, response) - df = defer.Deferred() - df.callback((message, (contact.address, contact.port))) - return df - - def _send(self, data, rpcID, address): - """ fake sending data """ - - -class NodeLookupTest(unittest.TestCase): - """ Test case for the Node class's iterativeFind node lookup algorithm """ - - def setUp(self): - # create a fake protocol to imitate communication with other nodes - self._protocol = FakeRPCProtocol() - # Note: The reactor is never started for this test. All deferred calls run sequentially, - # since there is no asynchronous network communication - # create the node to be tested in isolation - h = hashlib.sha384() - h.update('node1') - node_id = str(h.digest()) - self.node = lbrynet.dht.node.Node(node_id=node_id, udpPort=4000, networkProtocol=self._protocol) - self.updPort = 81173 - self.contactsAmount = 80 - # Reinitialise the routing table - self.node._routingTable = lbrynet.dht.routingtable.OptimizedTreeRoutingTable( - self.node.node_id) - - # create 160 bit node ID's for test purposes - self.testNodeIDs = [] - idNum = int(self.node.node_id.encode('hex'), 16) - for i in range(self.contactsAmount): - # create the testNodeIDs in ascending order, away from the actual node ID, - # with regards to the distance metric - self.testNodeIDs.append(str("%X" % (idNum + i + 1)).decode('hex')) - - # generate contacts - self.contacts = [] - for i in range(self.contactsAmount): - contact = lbrynet.dht.contact.Contact(self.testNodeIDs[i], "127.0.0.1", - self.updPort + i + 1, self._protocol) - self.contacts.append(contact) - - # create the network of contacts in format: (contact, closest contacts) - contactNetwork = ((self.contacts[0], self.contacts[8:15]), - (self.contacts[1], self.contacts[16:23]), - (self.contacts[2], self.contacts[24:31]), - (self.contacts[3], self.contacts[32:39]), - (self.contacts[4], self.contacts[40:47]), - (self.contacts[5], self.contacts[48:55]), - (self.contacts[6], self.contacts[56:63]), - (self.contacts[7], self.contacts[64:71]), - (self.contacts[8], self.contacts[72:79]), - (self.contacts[40], self.contacts[41:48]), - (self.contacts[41], self.contacts[41:48]), - (self.contacts[42], self.contacts[41:48]), - (self.contacts[43], self.contacts[41:48]), - (self.contacts[44], self.contacts[41:48]), - (self.contacts[45], self.contacts[41:48]), - (self.contacts[46], self.contacts[41:48]), - (self.contacts[47], self.contacts[41:48]), - (self.contacts[48], self.contacts[41:48]), - (self.contacts[50], self.contacts[0:7]), - (self.contacts[51], self.contacts[8:15]), - (self.contacts[52], self.contacts[16:23])) - - contacts_with_datastores = [] - - for contact_tuple in contactNetwork: - contacts_with_datastores.append((contact_tuple[0], contact_tuple[1], - lbrynet.dht.datastore.DictDataStore())) - self._protocol.createNetwork(contacts_with_datastores) - - @defer.inlineCallbacks - def testNodeBootStrap(self): - """ Test bootstrap with the closest possible contacts """ - - activeContacts = yield self.node._iterativeFind(self.node.node_id, self.contacts[0:8]) - # Set the expected result - expectedResult = set() - for item in self.contacts[0:6]: - expectedResult.add(item.id) - # Get the result from the deferred - - # Check the length of the active contacts - self.failUnlessEqual(activeContacts.__len__(), expectedResult.__len__(), - "More active contacts should exist, there should be %d " - "contacts but there are %d" % (len(expectedResult), - len(activeContacts))) - - # Check that the received active contacts are the same as the input contacts - self.failUnlessEqual({contact.id for contact in activeContacts}, expectedResult, - "Active should only contain the closest possible contacts" - " which were used as input for the boostrap") +# class FakeRPCProtocol(protocol.DatagramProtocol): +# def __init__(self): +# self.reactor = selectreactor.SelectReactor() +# self.testResponse = None +# self.network = None +# +# def createNetwork(self, contactNetwork): +# """ +# set up a list of contacts together with their closest contacts +# @param contactNetwork: a sequence of tuples, each containing a contact together with its +# closest contacts: C{(, )} +# """ +# self.network = contactNetwork +# +# def sendRPC(self, contact, method, args, rawResponse=False): +# """ Fake RPC protocol; allows entangled.kademlia.contact.Contact objects to "send" RPCs""" +# +# h = hashlib.sha384() +# h.update('rpcId') +# rpc_id = h.digest()[:20] +# +# if method == "findNode": +# # get the specific contacts closest contacts +# closestContacts = [] +# closestContactsList = [] +# for contactTuple in self.network: +# if contact == contactTuple[0]: +# # get the list of closest contacts for this contact +# closestContactsList = contactTuple[1] +# # Pack the closest contacts into a ResponseMessage +# for closeContact in closestContactsList: +# closestContacts.append((closeContact.id, closeContact.address, closeContact.port)) +# +# message = ResponseMessage(rpc_id, contact.id, closestContacts) +# df = defer.Deferred() +# df.callback((message, (contact.address, contact.port))) +# return df +# elif method == "findValue": +# for contactTuple in self.network: +# if contact == contactTuple[0]: +# # Get the data stored by this remote contact +# dataDict = contactTuple[2] +# dataKey = dataDict.keys()[0] +# data = dataDict.get(dataKey) +# # Check if this contact has the requested value +# if dataKey == args[0]: +# # Return the data value +# response = dataDict +# print "data found at contact: " + contact.id +# else: +# # Return the closest contact to the requested data key +# print "data not found at contact: " + contact.id +# closeContacts = contactTuple[1] +# closestContacts = [] +# for closeContact in closeContacts: +# closestContacts.append((closeContact.id, closeContact.address, +# closeContact.port)) +# response = closestContacts +# +# # Create the response message +# message = ResponseMessage(rpc_id, contact.id, response) +# df = defer.Deferred() +# df.callback((message, (contact.address, contact.port))) +# return df +# +# def _send(self, data, rpcID, address): +# """ fake sending data """ +# +# +# class NodeLookupTest(unittest.TestCase): +# """ Test case for the Node class's iterativeFind node lookup algorithm """ +# +# def setUp(self): +# # create a fake protocol to imitate communication with other nodes +# self._protocol = FakeRPCProtocol() +# # Note: The reactor is never started for this test. All deferred calls run sequentially, +# # since there is no asynchronous network communication +# # create the node to be tested in isolation +# h = hashlib.sha384() +# h.update('node1') +# node_id = str(h.digest()) +# self.node = Node(node_id, 4000, None, None, self._protocol) +# self.updPort = 81173 +# self.contactsAmount = 80 +# # Reinitialise the routing table +# self.node._routingTable = TreeRoutingTable(self.node.node_id) +# +# # create 160 bit node ID's for test purposes +# self.testNodeIDs = [] +# idNum = int(self.node.node_id.encode('hex'), 16) +# for i in range(self.contactsAmount): +# # create the testNodeIDs in ascending order, away from the actual node ID, +# # with regards to the distance metric +# self.testNodeIDs.append(str("%X" % (idNum + i + 1)).decode('hex')) +# +# # generate contacts +# self.contacts = [] +# for i in range(self.contactsAmount): +# contact = self.node.contact_manager.make_contact(self.testNodeIDs[i], "127.0.0.1", +# self.updPort + i + 1, self._protocol) +# self.contacts.append(contact) +# +# # create the network of contacts in format: (contact, closest contacts) +# contactNetwork = ((self.contacts[0], self.contacts[8:15]), +# (self.contacts[1], self.contacts[16:23]), +# (self.contacts[2], self.contacts[24:31]), +# (self.contacts[3], self.contacts[32:39]), +# (self.contacts[4], self.contacts[40:47]), +# (self.contacts[5], self.contacts[48:55]), +# (self.contacts[6], self.contacts[56:63]), +# (self.contacts[7], self.contacts[64:71]), +# (self.contacts[8], self.contacts[72:79]), +# (self.contacts[40], self.contacts[41:48]), +# (self.contacts[41], self.contacts[41:48]), +# (self.contacts[42], self.contacts[41:48]), +# (self.contacts[43], self.contacts[41:48]), +# (self.contacts[44], self.contacts[41:48]), +# (self.contacts[45], self.contacts[41:48]), +# (self.contacts[46], self.contacts[41:48]), +# (self.contacts[47], self.contacts[41:48]), +# (self.contacts[48], self.contacts[41:48]), +# (self.contacts[50], self.contacts[0:7]), +# (self.contacts[51], self.contacts[8:15]), +# (self.contacts[52], self.contacts[16:23])) +# +# contacts_with_datastores = [] +# +# for contact_tuple in contactNetwork: +# contacts_with_datastores.append((contact_tuple[0], contact_tuple[1], +# DictDataStore())) +# self._protocol.createNetwork(contacts_with_datastores) +# +# # @defer.inlineCallbacks +# # def testNodeBootStrap(self): +# # """ Test bootstrap with the closest possible contacts """ +# # # Set the expected result +# # expectedResult = {item.id for item in self.contacts[0:8]} +# # +# # activeContacts = yield self.node._iterativeFind(self.node.node_id, self.contacts[0:8]) +# # +# # # Check the length of the active contacts +# # self.failUnlessEqual(activeContacts.__len__(), expectedResult.__len__(), +# # "More active contacts should exist, there should be %d " +# # "contacts but there are %d" % (len(expectedResult), +# # len(activeContacts))) +# # +# # # Check that the received active contacts are the same as the input contacts +# # self.failUnlessEqual({contact.id for contact in activeContacts}, expectedResult, +# # "Active should only contain the closest possible contacts" +# # " which were used as input for the boostrap") diff --git a/lbrynet/tests/unit/dht/test_protocol.py b/lbrynet/tests/unit/dht/test_protocol.py index af636b631..02b6b5adb 100644 --- a/lbrynet/tests/unit/dht/test_protocol.py +++ b/lbrynet/tests/unit/dht/test_protocol.py @@ -1,200 +1,167 @@ -import time -import unittest -from twisted.internet.task import Clock -from twisted.internet import defer -import lbrynet.dht.protocol -import lbrynet.dht.contact -import lbrynet.dht.constants -import lbrynet.dht.msgtypes -from lbrynet.dht.error import TimeoutError -from lbrynet.dht.node import Node, rpcmethod -from lbrynet.tests.mocks import listenUDP, resolve -from lbrynet.core.call_later_manager import CallLaterManager - -import logging - -log = logging.getLogger() - - -class KademliaProtocolTest(unittest.TestCase): - """ Test case for the Protocol class """ - - udpPort = 9182 - - def setUp(self): - self._reactor = Clock() - CallLaterManager.setup(self._reactor.callLater) - self.node = Node(node_id='1' * 48, udpPort=self.udpPort, externalIP="127.0.0.1", listenUDP=listenUDP, - resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater) - - def tearDown(self): - CallLaterManager.stop() - del self._reactor - - @defer.inlineCallbacks - def testReactor(self): - """ Tests if the reactor can start/stop the protocol correctly """ - - d = defer.Deferred() - self._reactor.callLater(1, d.callback, True) - self._reactor.advance(1) - result = yield d - self.assertTrue(result) - - def testRPCTimeout(self): - """ Tests if a RPC message sent to a dead remote node times out correctly """ - dead_node = Node(node_id='2' * 48, udpPort=self.udpPort, externalIP="127.0.0.2", listenUDP=listenUDP, - resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater) - dead_node.start_listening() - dead_node.stop() - self._reactor.pump([1 for _ in range(10)]) - dead_contact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.2', 9182, self.node._protocol) - self.node.addContact(dead_contact) - - @rpcmethod - def fake_ping(*args, **kwargs): - time.sleep(lbrynet.dht.constants.rpcTimeout + 1) - return 'pong' - - real_ping = self.node.ping - real_timeout = lbrynet.dht.constants.rpcTimeout - real_attempts = lbrynet.dht.constants.rpcAttempts - lbrynet.dht.constants.rpcAttempts = 1 - lbrynet.dht.constants.rpcTimeout = 1 - - self.node.ping = fake_ping - # Make sure the contact was added - self.failIf(dead_contact not in self.node.contacts, - 'Contact not added to fake node (error in test code)') - self.node.start_listening() - - # Run the PING RPC (which should raise a timeout error) - df = self.node._protocol.sendRPC(dead_contact, 'ping', {}) - - def check_timeout(err): - self.assertEqual(err.type, TimeoutError) - - df.addErrback(check_timeout) - - def reset_values(): - self.node.ping = real_ping - lbrynet.dht.constants.rpcTimeout = real_timeout - lbrynet.dht.constants.rpcAttempts = real_attempts - - # See if the contact was removed due to the timeout - def check_removed_contact(): - self.failIf(dead_contact in self.node.contacts, - 'Contact was not removed after RPC timeout; check exception types.') - - df.addCallback(lambda _: reset_values()) - - # Stop the reactor if a result arrives (timeout or not) - df.addCallback(lambda _: check_removed_contact()) - self._reactor.pump([1 for _ in range(20)]) - - def testRPCRequest(self): - """ Tests if a valid RPC request is executed and responded to correctly """ - - remote_node = Node(node_id='2' * 48, udpPort=self.udpPort, externalIP="127.0.0.2", listenUDP=listenUDP, - resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater) - remote_node.start_listening() - remoteContact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.2', 9182, self.node._protocol) - self.node.addContact(remoteContact) - - self.error = None - - def handleError(f): - self.error = 'An RPC error occurred: %s' % f.getErrorMessage() - - def handleResult(result): - expectedResult = 'pong' - if result != expectedResult: - self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' \ - % (expectedResult, result) - - # Publish the "local" node on the network - self.node.start_listening() - # Simulate the RPC - df = remoteContact.ping() - df.addCallback(handleResult) - df.addErrback(handleError) - - for _ in range(10): - self._reactor.advance(1) - - self.failIf(self.error, self.error) - # The list of sent RPC messages should be empty at this stage - self.failUnlessEqual(len(self.node._protocol._sentMessages), 0, - 'The protocol is still waiting for a RPC result, ' - 'but the transaction is already done!') - - def testRPCAccess(self): - """ Tests invalid RPC requests - Verifies that a RPC request for an existing but unpublished - method is denied, and that the associated (remote) exception gets - raised locally """ - remote_node = Node(node_id='2' * 48, udpPort=self.udpPort, externalIP="127.0.0.2", listenUDP=listenUDP, - resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater) - remote_node.start_listening() - remote_contact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.2', 9182, self.node._protocol) - self.node.addContact(remote_contact) - - self.error = None - - def handleError(f): - try: - f.raiseException() - except AttributeError, e: - # This is the expected outcome since the remote node did not publish the method - self.error = None - except Exception, e: - self.error = 'The remote method failed, but the wrong exception was raised; ' \ - 'expected AttributeError, got %s' % type(e) - - def handleResult(result): - self.error = 'The remote method executed successfully, returning: "%s"; ' \ - 'this RPC should not have been allowed.' % result - - self.node.start_listening() - self._reactor.pump([1 for _ in range(10)]) - # Simulate the RPC - df = remote_contact.not_a_rpc_function() - df.addCallback(handleResult) - df.addErrback(handleError) - self._reactor.pump([1 for _ in range(10)]) - self.failIf(self.error, self.error) - # The list of sent RPC messages should be empty at this stage - self.failUnlessEqual(len(self.node._protocol._sentMessages), 0, - 'The protocol is still waiting for a RPC result, ' - 'but the transaction is already done!') - - def testRPCRequestArgs(self): - """ Tests if an RPC requiring arguments is executed correctly """ - remote_node = Node(node_id='2' * 48, udpPort=self.udpPort, externalIP="127.0.0.2", listenUDP=listenUDP, - resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater) - remote_node.start_listening() - remote_contact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.2', 9182, self.node._protocol) - self.node.addContact(remote_contact) - self.error = None - - def handleError(f): - self.error = 'An RPC error occurred: %s' % f.getErrorMessage() - - def handleResult(result): - expectedResult = 'pong' - if result != expectedResult: - self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' % \ - (expectedResult, result) - - # Publish the "local" node on the network - self.node.start_listening() - # Simulate the RPC - df = remote_contact.ping() - df.addCallback(handleResult) - df.addErrback(handleError) - self._reactor.pump([1 for _ in range(10)]) - self.failIf(self.error, self.error) - # The list of sent RPC messages should be empty at this stage - self.failUnlessEqual(len(self.node._protocol._sentMessages), 0, - 'The protocol is still waiting for a RPC result, ' - 'but the transaction is already done!') +# import time +# import unittest +# import twisted.internet.selectreactor +# +# import lbrynet.dht.protocol +# import lbrynet.dht.contact +# import lbrynet.dht.constants +# import lbrynet.dht.msgtypes +# from lbrynet.dht.error import TimeoutError +# from lbrynet.dht.node import Node, rpcmethod +# +# +# class KademliaProtocolTest(unittest.TestCase): +# """ Test case for the Protocol class """ +# +# def setUp(self): +# del lbrynet.dht.protocol.reactor +# lbrynet.dht.protocol.reactor = twisted.internet.selectreactor.SelectReactor() +# self.node = Node(node_id='1' * 48, udpPort=9182, externalIP="127.0.0.1") +# self.protocol = lbrynet.dht.protocol.KademliaProtocol(self.node) +# +# def testReactor(self): +# """ Tests if the reactor can start/stop the protocol correctly """ +# lbrynet.dht.protocol.reactor.listenUDP(0, self.protocol) +# lbrynet.dht.protocol.reactor.callLater(0, lbrynet.dht.protocol.reactor.stop) +# lbrynet.dht.protocol.reactor.run() +# +# def testRPCTimeout(self): +# """ Tests if a RPC message sent to a dead remote node times out correctly """ +# +# @rpcmethod +# def fake_ping(*args, **kwargs): +# time.sleep(lbrynet.dht.constants.rpcTimeout + 1) +# return 'pong' +# +# real_ping = self.node.ping +# real_timeout = lbrynet.dht.constants.rpcTimeout +# real_attempts = lbrynet.dht.constants.rpcAttempts +# lbrynet.dht.constants.rpcAttempts = 1 +# lbrynet.dht.constants.rpcTimeout = 1 +# self.node.ping = fake_ping +# deadContact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.1', 9182, self.protocol) +# self.node.addContact(deadContact) +# # Make sure the contact was added +# self.failIf(deadContact not in self.node.contacts, +# 'Contact not added to fake node (error in test code)') +# lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol) +# +# # Run the PING RPC (which should raise a timeout error) +# df = self.protocol.sendRPC(deadContact, 'ping', {}) +# +# def check_timeout(err): +# self.assertEqual(type(err), TimeoutError) +# +# df.addErrback(check_timeout) +# +# def reset_values(): +# self.node.ping = real_ping +# lbrynet.dht.constants.rpcTimeout = real_timeout +# lbrynet.dht.constants.rpcAttempts = real_attempts +# +# # See if the contact was removed due to the timeout +# def check_removed_contact(): +# self.failIf(deadContact in self.node.contacts, +# 'Contact was not removed after RPC timeout; check exception types.') +# +# df.addCallback(lambda _: reset_values()) +# +# # Stop the reactor if a result arrives (timeout or not) +# df.addBoth(lambda _: lbrynet.dht.protocol.reactor.stop()) +# df.addCallback(lambda _: check_removed_contact()) +# lbrynet.dht.protocol.reactor.run() +# +# def testRPCRequest(self): +# """ Tests if a valid RPC request is executed and responded to correctly """ +# remoteContact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.1', 9182, self.protocol) +# self.node.addContact(remoteContact) +# self.error = None +# +# def handleError(f): +# self.error = 'An RPC error occurred: %s' % f.getErrorMessage() +# +# def handleResult(result): +# expectedResult = 'pong' +# if result != expectedResult: +# self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' \ +# % (expectedResult, result) +# +# # Publish the "local" node on the network +# lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol) +# # Simulate the RPC +# df = remoteContact.ping() +# df.addCallback(handleResult) +# df.addErrback(handleError) +# df.addBoth(lambda _: lbrynet.dht.protocol.reactor.stop()) +# lbrynet.dht.protocol.reactor.run() +# self.failIf(self.error, self.error) +# # The list of sent RPC messages should be empty at this stage +# self.failUnlessEqual(len(self.protocol._sentMessages), 0, +# 'The protocol is still waiting for a RPC result, ' +# 'but the transaction is already done!') +# +# def testRPCAccess(self): +# """ Tests invalid RPC requests +# Verifies that a RPC request for an existing but unpublished +# method is denied, and that the associated (remote) exception gets +# raised locally """ +# remoteContact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.1', 9182, self.protocol) +# self.node.addContact(remoteContact) +# self.error = None +# +# def handleError(f): +# try: +# f.raiseException() +# except AttributeError, e: +# # This is the expected outcome since the remote node did not publish the method +# self.error = None +# except Exception, e: +# self.error = 'The remote method failed, but the wrong exception was raised; ' \ +# 'expected AttributeError, got %s' % type(e) +# +# def handleResult(result): +# self.error = 'The remote method executed successfully, returning: "%s"; ' \ +# 'this RPC should not have been allowed.' % result +# +# # Publish the "local" node on the network +# lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol) +# # Simulate the RPC +# df = remoteContact.not_a_rpc_function() +# df.addCallback(handleResult) +# df.addErrback(handleError) +# df.addBoth(lambda _: lbrynet.dht.protocol.reactor.stop()) +# lbrynet.dht.protocol.reactor.run() +# self.failIf(self.error, self.error) +# # The list of sent RPC messages should be empty at this stage +# self.failUnlessEqual(len(self.protocol._sentMessages), 0, +# 'The protocol is still waiting for a RPC result, ' +# 'but the transaction is already done!') +# +# def testRPCRequestArgs(self): +# """ Tests if an RPC requiring arguments is executed correctly """ +# remoteContact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.1', 9182, self.protocol) +# self.node.addContact(remoteContact) +# self.error = None +# +# def handleError(f): +# self.error = 'An RPC error occurred: %s' % f.getErrorMessage() +# +# def handleResult(result): +# expectedResult = 'pong' +# if result != expectedResult: +# self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' % \ +# (expectedResult, result) +# +# # Publish the "local" node on the network +# lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol) +# # Simulate the RPC +# df = remoteContact.ping() +# df.addCallback(handleResult) +# df.addErrback(handleError) +# df.addBoth(lambda _: lbrynet.dht.protocol.reactor.stop()) +# lbrynet.dht.protocol.reactor.run() +# self.failIf(self.error, self.error) +# # The list of sent RPC messages should be empty at this stage +# self.failUnlessEqual(len(self.protocol._sentMessages), 0, +# 'The protocol is still waiting for a RPC result, ' +# 'but the transaction is already done!') diff --git a/lbrynet/tests/unit/dht/test_routingtable.py b/lbrynet/tests/unit/dht/test_routingtable.py index 8c0907509..1c6e48098 100644 --- a/lbrynet/tests/unit/dht/test_routingtable.py +++ b/lbrynet/tests/unit/dht/test_routingtable.py @@ -1,29 +1,16 @@ import hashlib -import unittest - -import lbrynet.dht.constants -import lbrynet.dht.routingtable -import lbrynet.dht.contact -import lbrynet.dht.node -import lbrynet.dht.distance +from twisted.trial import unittest +from twisted.internet import defer +from lbrynet.dht import constants +from lbrynet.dht.routingtable import TreeRoutingTable +from lbrynet.dht.contact import ContactManager +from lbrynet.dht.distance import Distance class FakeRPCProtocol(object): """ Fake RPC protocol; allows lbrynet.dht.contact.Contact objects to "send" RPCs """ def sendRPC(self, *args, **kwargs): - return FakeDeferred() - - -class FakeDeferred(object): - """ Fake Twisted Deferred object; allows the routing table to add callbacks that do nothing """ - def addCallback(self, *args, **kwargs): - return - - def addErrback(self, *args, **kwargs): - return - - def addCallbacks(self, *args, **kwargs): - return + return defer.succeed(None) class TreeRoutingTableTest(unittest.TestCase): @@ -31,9 +18,10 @@ class TreeRoutingTableTest(unittest.TestCase): def setUp(self): h = hashlib.sha384() h.update('node1') + self.contact_manager = ContactManager() self.nodeID = h.digest() self.protocol = FakeRPCProtocol() - self.routingTable = lbrynet.dht.routingtable.TreeRoutingTable(self.nodeID) + self.routingTable = TreeRoutingTable(self.nodeID) def testDistance(self): """ Test to see if distance method returns correct result""" @@ -42,86 +30,91 @@ class TreeRoutingTableTest(unittest.TestCase): basicTestList = [('123456789', '123456789', 0L), ('12345', '98765', 34527773184L)] for test in basicTestList: - result = lbrynet.dht.distance.Distance(test[0])(test[1]) + result = Distance(test[0])(test[1]) self.failIf(result != test[2], 'Result of _distance() should be %s but %s returned' % (test[2], result)) baseIp = '146.64.19.111' ipTestList = ['146.64.29.222', '192.68.19.333'] - distanceOne = lbrynet.dht.distance.Distance(baseIp)(ipTestList[0]) - distanceTwo = lbrynet.dht.distance.Distance(baseIp)(ipTestList[1]) + distanceOne = Distance(baseIp)(ipTestList[0]) + distanceTwo = Distance(baseIp)(ipTestList[1]) self.failIf(distanceOne > distanceTwo, '%s should be closer to the base ip %s than %s' % (ipTestList[0], baseIp, ipTestList[1])) + @defer.inlineCallbacks def testAddContact(self): """ Tests if a contact can be added and retrieved correctly """ # Create the contact h = hashlib.sha384() h.update('node2') contactID = h.digest() - contact = lbrynet.dht.contact.Contact(contactID, '127.0.0.1', 91824, self.protocol) + contact = self.contact_manager.make_contact(contactID, '127.0.0.1', 91824, self.protocol) # Now add it... - self.routingTable.addContact(contact) + yield self.routingTable.addContact(contact) # ...and request the closest nodes to it (will retrieve it) - closestNodes = self.routingTable.findCloseNodes(contactID, lbrynet.dht.constants.k) + closestNodes = self.routingTable.findCloseNodes(contactID, constants.k) self.failUnlessEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; expected 1,' ' got %d' % len(closestNodes)) self.failUnless(contact in closestNodes, 'Added contact not found by issueing ' '_findCloseNodes()') + @defer.inlineCallbacks def testGetContact(self): """ Tests if a specific existing contact can be retrieved correctly """ h = hashlib.sha384() h.update('node2') contactID = h.digest() - contact = lbrynet.dht.contact.Contact(contactID, '127.0.0.1', 91824, self.protocol) + contact = self.contact_manager.make_contact(contactID, '127.0.0.1', 91824, self.protocol) # Now add it... - self.routingTable.addContact(contact) + yield self.routingTable.addContact(contact) # ...and get it again sameContact = self.routingTable.getContact(contactID) self.failUnlessEqual(contact, sameContact, 'getContact() should return the same contact') + @defer.inlineCallbacks def testAddParentNodeAsContact(self): """ Tests the routing table's behaviour when attempting to add its parent node as a contact """ # Create a contact with the same ID as the local node's ID - contact = lbrynet.dht.contact.Contact(self.nodeID, '127.0.0.1', 91824, self.protocol) + contact = self.contact_manager.make_contact(self.nodeID, '127.0.0.1', 91824, self.protocol) # Now try to add it - self.routingTable.addContact(contact) + yield self.routingTable.addContact(contact) # ...and request the closest nodes to it using FIND_NODE - closestNodes = self.routingTable.findCloseNodes(self.nodeID, lbrynet.dht.constants.k) + closestNodes = self.routingTable.findCloseNodes(self.nodeID, constants.k) self.failIf(contact in closestNodes, 'Node added itself as a contact') + @defer.inlineCallbacks def testRemoveContact(self): """ Tests contact removal """ # Create the contact h = hashlib.sha384() h.update('node2') contactID = h.digest() - contact = lbrynet.dht.contact.Contact(contactID, '127.0.0.1', 91824, self.protocol) + contact = self.contact_manager.make_contact(contactID, '127.0.0.1', 91824, self.protocol) # Now add it... - self.routingTable.addContact(contact) + yield self.routingTable.addContact(contact) # Verify addition self.failUnlessEqual(len(self.routingTable._buckets[0]), 1, 'Contact not added properly') # Now remove it - self.routingTable.removeContact(contact.id) + self.routingTable.removeContact(contact) self.failUnlessEqual(len(self.routingTable._buckets[0]), 0, 'Contact not removed properly') + @defer.inlineCallbacks def testSplitBucket(self): """ Tests if the the routing table correctly dynamically splits k-buckets """ self.failUnlessEqual(self.routingTable._buckets[0].rangeMax, 2**384, 'Initial k-bucket range should be 0 <= range < 2**384') # Add k contacts - for i in range(lbrynet.dht.constants.k): + for i in range(constants.k): h = hashlib.sha384() h.update('remote node %d' % i) nodeID = h.digest() - contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol) - self.routingTable.addContact(contact) + contact = self.contact_manager.make_contact(nodeID, '127.0.0.1', 91824, self.protocol) + yield self.routingTable.addContact(contact) self.failUnlessEqual(len(self.routingTable._buckets), 1, 'Only k nodes have been added; the first k-bucket should now ' 'be full, but should not yet be split') @@ -129,8 +122,8 @@ class TreeRoutingTableTest(unittest.TestCase): h = hashlib.sha384() h.update('yet another remote node') nodeID = h.digest() - contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol) - self.routingTable.addContact(contact) + contact = self.contact_manager.make_contact(nodeID, '127.0.0.1', 91824, self.protocol) + yield self.routingTable.addContact(contact) self.failUnlessEqual(len(self.routingTable._buckets), 2, 'k+1 nodes have been added; the first k-bucket should have been ' 'split into two new buckets') @@ -144,99 +137,113 @@ class TreeRoutingTableTest(unittest.TestCase): 'K-bucket was split, but the min/max ranges were ' 'not divided properly') - def testFullBucketNoSplit(self): + @defer.inlineCallbacks + def testFullSplit(self): """ Test that a bucket is not split if it full, but does not cover the range containing the parent node's ID """ + self.routingTable._parentNodeID = 49 * 'a' # more than 384 bits; this will not be in the range of _any_ k-bucket + + node_ids = [ + "d4a27096d81e3c4efacce9f940e887c956f736f859c8037b556efec6fdda5c388ae92bae96b9eb204b24da2f376c4282", + "553c0bfe119c35247c8cb8124091acb5c05394d5be7b019f6b1a5e18036af7a6148711ad6d47a0f955047bf9eac868aa", + "671a179c251c90863f46e7ef54264cbbad743fe3127871064d8f051ce4124fcbd893339e11358f621655e37bd6a74097", + "f896bafeb7ffb14b92986e3b08ee06807fdd5be34ab43f4f52559a5bbf0f12dedcd8556801f97c334b3ac9be7a0f7a93", + "33a7deb380eb4707211184798b66840c22c396e8cde00b75b64f9ead09bad1141b56d35a93bd511adb28c6708eecc39d", + "5e1e8ca575b536ae5ec52f7766ada904a64ebaad805909b1067ec3c984bf99909c9fcdd37e04ea5c5c043ea8830100ce", + "ee18857d0c1f7fc413424f3ffead4871f2499646d4c2ac16f35f0c8864318ca21596915f18f85a3a25f8ceaa56c844aa", + "68039f78fbf130873e7cce2f71f39d217dcb7f3fe562d64a85de4e21ee980b4a800f51bf6851d2bbf10e6590fe0d46b2" + ] + # Add k contacts - for i in range(lbrynet.dht.constants.k): + for i in range(constants.k): h = hashlib.sha384() h.update('remote node %d' % i) nodeID = h.digest() - contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol) - self.routingTable.addContact(contact) - self.failUnlessEqual(len(self.routingTable._buckets), 1, 'Only k nodes have been added; ' - 'the first k-bucket should now be ' - 'full, and there should not be ' - 'more than 1 bucket') - self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), lbrynet.dht.constants.k, - 'Bucket should have k contacts; expected %d got %d' % - (lbrynet.dht.constants.k, - len(self.routingTable._buckets[0]._contacts))) - # Now add 1 more contact + self.assertEquals(nodeID, node_ids[i].decode('hex')) + contact = self.contact_manager.make_contact(nodeID, '127.0.0.1', 91824, self.protocol) + yield self.routingTable.addContact(contact) + self.failUnlessEqual(len(self.routingTable._buckets), 1) + self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), constants.k) + + # try adding a contact who is further from us than the k'th known contact + h = hashlib.sha384() + h.update('yet another remote node!') + nodeID = h.digest() + contact = self.contact_manager.make_contact(nodeID, '127.0.0.1', 91824, self.protocol) + yield self.routingTable.addContact(contact) + self.failUnlessEqual(len(self.routingTable._buckets), 1) + self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), constants.k) + self.failIf(contact in self.routingTable._buckets[0]._contacts) + + # try adding a contact who is closer to us than the k'th known contact h = hashlib.sha384() h.update('yet another remote node') nodeID = h.digest() - contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol) - self.routingTable.addContact(contact) - self.failUnlessEqual(len(self.routingTable._buckets), 1, - 'There should not be more than 1 bucket, since the bucket ' - 'should not have been split (parent node ID not in range)') - self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), - lbrynet.dht.constants.k, 'Bucket should have k contacts; ' - 'expected %d got %d' % - (lbrynet.dht.constants.k, - len(self.routingTable._buckets[0]._contacts))) - self.failIf(contact in self.routingTable._buckets[0]._contacts, - 'New contact should have been discarded (since RPC is faked in this test)') + contact = self.contact_manager.make_contact(nodeID, '127.0.0.1', 91824, self.protocol) + yield self.routingTable.addContact(contact) + self.failUnlessEqual(len(self.routingTable._buckets), 2) + self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), 5) + self.failUnlessEqual(len(self.routingTable._buckets[1]._contacts), 4) + self.failIf(contact not in self.routingTable._buckets[1]._contacts) -class KeyErrorFixedTest(unittest.TestCase): - """ Basic tests case for boolean operators on the Contact class """ - - def setUp(self): - own_id = (2 ** lbrynet.dht.constants.key_bits) - 1 - # carefully chosen own_id. here's the logic - # we want a bunch of buckets (k+1, to be exact), and we want to make sure own_id - # is not in bucket 0. so we put own_id at the end so we can keep splitting by adding to the - # end - - self.table = lbrynet.dht.routingtable.OptimizedTreeRoutingTable(own_id) - - def fill_bucket(self, bucket_min): - bucket_size = lbrynet.dht.constants.k - for i in range(bucket_min, bucket_min + bucket_size): - self.table.addContact(lbrynet.dht.contact.Contact(long(i), '127.0.0.1', 9999, None)) - - def overflow_bucket(self, bucket_min): - bucket_size = lbrynet.dht.constants.k - self.fill_bucket(bucket_min) - self.table.addContact( - lbrynet.dht.contact.Contact(long(bucket_min + bucket_size + 1), - '127.0.0.1', 9999, None)) - - def testKeyError(self): - - # find middle, so we know where bucket will split - bucket_middle = self.table._buckets[0].rangeMax / 2 - - # fill last bucket - self.fill_bucket(self.table._buckets[0].rangeMax - lbrynet.dht.constants.k - 1) - # -1 in previous line because own_id is in last bucket - - # fill/overflow 7 more buckets - bucket_start = 0 - for i in range(0, lbrynet.dht.constants.k): - self.overflow_bucket(bucket_start) - bucket_start += bucket_middle / (2 ** i) - - # replacement cache now has k-1 entries. - # adding one more contact to bucket 0 used to cause a KeyError, but it should work - self.table.addContact( - lbrynet.dht.contact.Contact(long(lbrynet.dht.constants.k + 2), '127.0.0.1', 9999, None)) - - # import math - # print "" - # for i, bucket in enumerate(self.table._buckets): - # print "Bucket " + str(i) + " (2 ** " + str( - # math.log(bucket.rangeMin, 2) if bucket.rangeMin > 0 else 0) + " <= x < 2 ** "+str( - # math.log(bucket.rangeMax, 2)) + ")" - # for c in bucket.getContacts(): - # print " contact " + str(c.id) - # for key, bucket in self.table._replacementCache.iteritems(): - # print "Replacement Cache for Bucket " + str(key) - # for c in bucket: - # print " contact " + str(c.id) +# class KeyErrorFixedTest(unittest.TestCase): +# """ Basic tests case for boolean operators on the Contact class """ +# +# def setUp(self): +# own_id = (2 ** constants.key_bits) - 1 +# # carefully chosen own_id. here's the logic +# # we want a bunch of buckets (k+1, to be exact), and we want to make sure own_id +# # is not in bucket 0. so we put own_id at the end so we can keep splitting by adding to the +# # end +# +# self.table = lbrynet.dht.routingtable.OptimizedTreeRoutingTable(own_id) +# +# def fill_bucket(self, bucket_min): +# bucket_size = lbrynet.dht.constants.k +# for i in range(bucket_min, bucket_min + bucket_size): +# self.table.addContact(lbrynet.dht.contact.Contact(long(i), '127.0.0.1', 9999, None)) +# +# def overflow_bucket(self, bucket_min): +# bucket_size = lbrynet.dht.constants.k +# self.fill_bucket(bucket_min) +# self.table.addContact( +# lbrynet.dht.contact.Contact(long(bucket_min + bucket_size + 1), +# '127.0.0.1', 9999, None)) +# +# def testKeyError(self): +# +# # find middle, so we know where bucket will split +# bucket_middle = self.table._buckets[0].rangeMax / 2 +# +# # fill last bucket +# self.fill_bucket(self.table._buckets[0].rangeMax - lbrynet.dht.constants.k - 1) +# # -1 in previous line because own_id is in last bucket +# +# # fill/overflow 7 more buckets +# bucket_start = 0 +# for i in range(0, lbrynet.dht.constants.k): +# self.overflow_bucket(bucket_start) +# bucket_start += bucket_middle / (2 ** i) +# +# # replacement cache now has k-1 entries. +# # adding one more contact to bucket 0 used to cause a KeyError, but it should work +# self.table.addContact( +# lbrynet.dht.contact.Contact(long(lbrynet.dht.constants.k + 2), '127.0.0.1', 9999, None)) +# +# # import math +# # print "" +# # for i, bucket in enumerate(self.table._buckets): +# # print "Bucket " + str(i) + " (2 ** " + str( +# # math.log(bucket.rangeMin, 2) if bucket.rangeMin > 0 else 0) + " <= x < 2 ** "+str( +# # math.log(bucket.rangeMax, 2)) + ")" +# # for c in bucket.getContacts(): +# # print " contact " + str(c.id) +# # for key, bucket in self.table._replacementCache.iteritems(): +# # print "Replacement Cache for Bucket " + str(key) +# # for c in bucket: +# # print " contact " + str(c.id) diff --git a/lbrynet/tests/util.py b/lbrynet/tests/util.py index e6ad2005c..68b445c8e 100644 --- a/lbrynet/tests/util.py +++ b/lbrynet/tests/util.py @@ -5,20 +5,11 @@ import os import tempfile import shutil import mock -import logging -from lbrynet.dht.encoding import Bencode -from lbrynet.dht.error import DecodeError -from lbrynet.dht.msgformat import DefaultFormat -from lbrynet.dht.msgtypes import ResponseMessage, RequestMessage, ErrorMessage -_encode = Bencode() -_datagram_formatter = DefaultFormat() DEFAULT_TIMESTAMP = datetime.datetime(2016, 1, 1) DEFAULT_ISO_TIME = time.mktime(DEFAULT_TIMESTAMP.timetuple()) -log = logging.getLogger("lbrynet.tests.util") - def mk_db_and_blob_dir(): db_dir = tempfile.mkdtemp() @@ -49,28 +40,5 @@ def resetTime(test_case, timestamp=DEFAULT_TIMESTAMP): patcher.start().return_value = timestamp test_case.addCleanup(patcher.stop) - def is_android(): return 'ANDROID_ARGUMENT' in os.environ # detect Android using the Kivy way - - -def debug_kademlia_packet(data, source, destination, node): - if log.level != logging.DEBUG: - return - try: - packet = _datagram_formatter.fromPrimitive(_encode.decode(data)) - if isinstance(packet, RequestMessage): - log.debug("request %s --> %s %s (node time %s)", source[0], destination[0], packet.request, - node.clock.seconds()) - elif isinstance(packet, ResponseMessage): - if isinstance(packet.response, (str, unicode)): - log.debug("response %s <-- %s %s (node time %s)", destination[0], source[0], packet.response, - node.clock.seconds()) - else: - log.debug("response %s <-- %s %i contacts (node time %s)", destination[0], source[0], - len(packet.response), node.clock.seconds()) - elif isinstance(packet, ErrorMessage): - log.error("error %s <-- %s %s (node time %s)", destination[0], source[0], packet.exceptionType, - node.clock.seconds()) - except DecodeError: - log.exception("decode error %s --> %s (node time %s)", source[0], destination[0], node.clock.seconds())