update mocks and dht tests

-reorganize dht tests
This commit is contained in:
Jack Robison 2018-05-23 18:28:22 -04:00
parent e1079a0c0f
commit 950ec5bc9a
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
17 changed files with 1097 additions and 1032 deletions

View file

View file

@ -0,0 +1,174 @@
import logging
from twisted.trial import unittest
from twisted.internet import defer, task
from lbrynet.dht.node import Node
from mock_transport import resolve, listenUDP, MOCK_DHT_SEED_DNS, mock_node_generator
log = logging.getLogger(__name__)
class TestKademliaBase(unittest.TestCase):
timeout = 300.0 # timeout for each test
network_size = 16 # including seed nodes
node_ids = None
seed_dns = MOCK_DHT_SEED_DNS
def _add_next_node(self):
node_id, node_ip = self.mock_node_generator.next()
node = Node(node_id=node_id.decode('hex'), udpPort=4444, peerPort=3333, externalIP=node_ip,
resolve=resolve, listenUDP=listenUDP, callLater=self.clock.callLater, clock=self.clock)
self.nodes.append(node)
return node
@defer.inlineCallbacks
def add_node(self):
node = self._add_next_node()
yield node.start([(seed_name, 4444) for seed_name in sorted(self.seed_dns.keys())])
defer.returnValue(node)
def get_node(self, node_id):
for node in self.nodes:
if node.node_id == node_id:
return node
raise KeyError(node_id)
@defer.inlineCallbacks
def pop_node(self):
node = self.nodes.pop()
yield node.stop()
def pump_clock(self, n, step=0.1, tick_callback=None):
"""
:param n: seconds to run the reactor for
:param step: reactor tick rate (in seconds)
"""
for _ in range(int(n * (1.0 / float(step)))):
self.clock.advance(step)
if tick_callback and callable(tick_callback):
tick_callback(self.clock.seconds())
def run_reactor(self, seconds, deferreds, tick_callback=None):
d = defer.DeferredList(deferreds)
self.pump_clock(seconds, tick_callback=tick_callback)
return d
def get_contacts(self):
contacts = {}
for seed in self._seeds:
contacts[seed] = seed.contacts
for node in self._seeds:
contacts[node] = node.contacts
return contacts
def get_routable_addresses(self):
known = set()
for n in self._seeds:
known.update([(c.id, c.address, c.port) for c in n.contacts])
for n in self.nodes:
known.update([(c.id, c.address, c.port) for c in n.contacts])
addresses = {triple[1] for triple in known}
return addresses
def get_online_addresses(self):
online = set()
for n in self._seeds:
online.add(n.externalIP)
for n in self.nodes:
online.add(n.externalIP)
return online
def show_info(self):
known = set()
for n in self._seeds:
known.update([(c.id, c.address, c.port) for c in n.contacts])
for n in self.nodes:
known.update([(c.id, c.address, c.port) for c in n.contacts])
log.info("Routable: %i/%i", len(known), len(self.nodes) + len(self._seeds))
for n in self._seeds:
log.info("seed %s has %i contacts in %i buckets", n.externalIP, len(n.contacts),
len([b for b in n._routingTable._buckets if b.getContacts()]))
for n in self.nodes:
log.info("node %s has %i contacts in %i buckets", n.externalIP, len(n.contacts),
len([b for b in n._routingTable._buckets if b.getContacts()]))
@defer.inlineCallbacks
def setUp(self):
self.nodes = []
self._seeds = []
self.clock = task.Clock()
self.mock_node_generator = mock_node_generator(mock_node_ids=self.node_ids)
seed_dl = []
seeds = sorted(list(self.seed_dns.keys()))
known_addresses = [(seed_name, 4444) for seed_name in seeds]
for seed_dns in seeds:
self._add_next_node()
seed = self.nodes.pop()
self._seeds.append(seed)
seed_dl.append(
seed.start(known_addresses)
)
yield self.run_reactor(901, seed_dl)
while len(self.nodes + self._seeds) < self.network_size:
network_dl = []
for i in range(min(10, self.network_size - len(self._seeds) - len(self.nodes))):
network_dl.append(self.add_node())
yield self.run_reactor(31, network_dl)
self.assertEqual(len(self.nodes + self._seeds), self.network_size)
self.pump_clock(1800)
self.verify_all_nodes_are_routable()
self.verify_all_nodes_are_pingable()
@defer.inlineCallbacks
def tearDown(self):
dl = []
while self.nodes:
dl.append(self.pop_node()) # stop all of the nodes
while self._seeds:
dl.append(self._seeds.pop().stop()) # and the seeds
yield defer.DeferredList(dl)
def verify_all_nodes_are_routable(self):
routable = set()
node_addresses = {node.externalIP for node in self.nodes}
node_addresses = node_addresses.union({node.externalIP for node in self._seeds})
for node in self._seeds:
contact_addresses = {contact.address for contact in node.contacts}
routable.update(contact_addresses)
for node in self.nodes:
contact_addresses = {contact.address for contact in node.contacts}
routable.update(contact_addresses)
self.assertSetEqual(routable, node_addresses)
@defer.inlineCallbacks
def verify_all_nodes_are_pingable(self):
ping_replies = {}
ping_dl = []
contacted = set()
def _ping_cb(result, node, replies):
replies[node] = result
for node in self._seeds:
contact_addresses = set()
for contact in node.contacts:
contact_addresses.add(contact.address)
d = contact.ping()
d.addCallback(_ping_cb, contact.address, ping_replies)
contacted.add(contact.address)
ping_dl.append(d)
for node in self.nodes:
contact_addresses = set()
for contact in node.contacts:
contact_addresses.add(contact.address)
d = contact.ping()
d.addCallback(_ping_cb, contact.address, ping_replies)
contacted.add(contact.address)
ping_dl.append(d)
yield self.run_reactor(2, ping_dl)
node_addresses = {node.externalIP for node in self.nodes}.union({seed.externalIP for seed in self._seeds})
self.assertSetEqual(node_addresses, contacted)
expected = {node: "pong" for node in contacted}
self.assertDictEqual(ping_replies, expected)

View file

@ -0,0 +1,149 @@
import struct
import logging
from twisted.internet import defer, error
from lbrynet.core.utils import generate_id
from lbrynet.dht.encoding import Bencode
from lbrynet.dht.error import DecodeError
from lbrynet.dht.msgformat import DefaultFormat
from lbrynet.dht.msgtypes import ResponseMessage, RequestMessage, ErrorMessage
_encode = Bencode()
_datagram_formatter = DefaultFormat()
log = logging.getLogger()
MOCK_DHT_NODES = [
"cc8db9d0dd9b65b103594b5f992adf09f18b310958fa451d61ce8d06f3ee97a91461777c2b7dea1a89d02d2f23eb0e4f",
"83a3a398eead3f162fbbe1afb3d63482bb5b6d3cdd8f9b0825c1dfa58dffd3f6f6026d6e64d6d4ae4c3dfe2262e734ba",
"b6928ff25778a7bbb5d258d3b3a06e26db1654f3d2efce8c26681d43f7237cdf2e359a4d309c4473d5d89ec99fb4f573",
]
MOCK_DHT_SEED_DNS = { # these map to mock nodes 0, 1, and 2
"lbrynet1.lbry.io": "10.42.42.1",
"lbrynet2.lbry.io": "10.42.42.2",
"lbrynet3.lbry.io": "10.42.42.3",
"lbrynet4.lbry.io": "10.42.42.4",
"lbrynet5.lbry.io": "10.42.42.5",
"lbrynet6.lbry.io": "10.42.42.6",
"lbrynet7.lbry.io": "10.42.42.7",
"lbrynet8.lbry.io": "10.42.42.8",
"lbrynet9.lbry.io": "10.42.42.9",
"lbrynet10.lbry.io": "10.42.42.10",
"lbrynet11.lbry.io": "10.42.42.11",
"lbrynet12.lbry.io": "10.42.42.12",
"lbrynet13.lbry.io": "10.42.42.13",
"lbrynet14.lbry.io": "10.42.42.14",
"lbrynet15.lbry.io": "10.42.42.15",
"lbrynet16.lbry.io": "10.42.42.16",
}
def resolve(name, timeout=(1, 3, 11, 45)):
if name not in MOCK_DHT_SEED_DNS:
return defer.fail(error.DNSLookupError(name))
return defer.succeed(MOCK_DHT_SEED_DNS[name])
class MockUDPTransport(object):
def __init__(self, address, port, max_packet_size, protocol):
self.address = address
self.port = port
self.max_packet_size = max_packet_size
self._node = protocol._node
def write(self, data, address):
if address in MockNetwork.peers:
dest = MockNetwork.peers[address][0]
debug_kademlia_packet(data, (self.address, self.port), address, self._node)
dest.datagramReceived(data, (self.address, self.port))
else: # the node is sending to an address that doesnt currently exist, act like it never arrived
pass
class MockUDPPort(object):
def __init__(self, protocol, remover):
self.protocol = protocol
self._remover = remover
def startListening(self, reason=None):
return self.protocol.startProtocol()
def stopListening(self, reason=None):
result = self.protocol.stopProtocol()
self._remover()
return result
class MockNetwork(object):
peers = {} # (interface, port): (protocol, max_packet_size)
@classmethod
def add_peer(cls, port, protocol, interface, maxPacketSize):
interface = protocol._node.externalIP
protocol.transport = MockUDPTransport(interface, port, maxPacketSize, protocol)
cls.peers[(interface, port)] = (protocol, maxPacketSize)
def remove_peer():
del protocol.transport
if (interface, port) in cls.peers:
del cls.peers[(interface, port)]
return remove_peer
def listenUDP(port, protocol, interface='', maxPacketSize=8192):
remover = MockNetwork.add_peer(port, protocol, interface, maxPacketSize)
port = MockUDPPort(protocol, remover)
port.startListening()
return port
def address_generator(address=(10, 42, 42, 1)):
def increment(addr):
value = struct.unpack("I", "".join([chr(x) for x in list(addr)[::-1]]))[0] + 1
new_addr = []
for i in range(4):
new_addr.append(value % 256)
value >>= 8
return tuple(new_addr[::-1])
while True:
yield "{}.{}.{}.{}".format(*address)
address = increment(address)
def mock_node_generator(count=None, mock_node_ids=MOCK_DHT_NODES):
if mock_node_ids is None:
mock_node_ids = MOCK_DHT_NODES
mock_node_ids = list(mock_node_ids)
for num, node_ip in enumerate(address_generator()):
if count and num >= count:
break
if num >= len(mock_node_ids):
node_id = generate_id().encode('hex')
else:
node_id = mock_node_ids[num]
yield (node_id, node_ip)
def debug_kademlia_packet(data, source, destination, node):
if log.level != logging.DEBUG:
return
try:
packet = _datagram_formatter.fromPrimitive(_encode.decode(data))
if isinstance(packet, RequestMessage):
log.debug("request %s --> %s %s (node time %s)", source[0], destination[0], packet.request,
node.clock.seconds())
elif isinstance(packet, ResponseMessage):
if isinstance(packet.response, (str, unicode)):
log.debug("response %s <-- %s %s (node time %s)", destination[0], source[0], packet.response,
node.clock.seconds())
else:
log.debug("response %s <-- %s %i contacts (node time %s)", destination[0], source[0],
len(packet.response), node.clock.seconds())
elif isinstance(packet, ErrorMessage):
log.error("error %s <-- %s %s (node time %s)", destination[0], source[0], packet.exceptionType,
node.clock.seconds())
except DecodeError:
log.exception("decode error %s --> %s (node time %s)", source[0], destination[0], node.clock.seconds())

View file

@ -0,0 +1,10 @@
from dht_test_environment import TestKademliaBase
class TestKademliaBootstrap(TestKademliaBase):
"""
Test initializing the network / connecting the seed nodes
"""
def test_bootstrap_seed_nodes(self):
pass

View file

@ -0,0 +1,200 @@
import time
import unittest
import logging
from twisted.internet.task import Clock
from twisted.internet import defer
import lbrynet.dht.protocol
import lbrynet.dht.contact
import lbrynet.dht.constants
import lbrynet.dht.msgtypes
from lbrynet.dht.error import TimeoutError
from lbrynet.dht.node import Node, rpcmethod
from lbrynet.core.call_later_manager import CallLaterManager
from mock_transport import listenUDP, resolve
log = logging.getLogger()
class KademliaProtocolTest(unittest.TestCase):
""" Test case for the Protocol class """
udpPort = 9182
def setUp(self):
self._reactor = Clock()
CallLaterManager.setup(self._reactor.callLater)
self.node = Node(node_id='1' * 48, udpPort=self.udpPort, externalIP="127.0.0.1", listenUDP=listenUDP,
resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater)
def tearDown(self):
CallLaterManager.stop()
del self._reactor
@defer.inlineCallbacks
def testReactor(self):
""" Tests if the reactor can start/stop the protocol correctly """
d = defer.Deferred()
self._reactor.callLater(1, d.callback, True)
self._reactor.advance(1)
result = yield d
self.assertTrue(result)
def testRPCTimeout(self):
""" Tests if a RPC message sent to a dead remote node times out correctly """
dead_node = Node(node_id='2' * 48, udpPort=self.udpPort, externalIP="127.0.0.2", listenUDP=listenUDP,
resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater)
dead_node.start_listening()
dead_node.stop()
self._reactor.pump([1 for _ in range(10)])
dead_contact = self.node.contact_manager.make_contact('2' * 48, '127.0.0.2', 9182, self.node._protocol)
self.node.addContact(dead_contact)
@rpcmethod
def fake_ping(*args, **kwargs):
time.sleep(lbrynet.dht.constants.rpcTimeout + 1)
return 'pong'
real_ping = self.node.ping
real_timeout = lbrynet.dht.constants.rpcTimeout
real_attempts = lbrynet.dht.constants.rpcAttempts
lbrynet.dht.constants.rpcAttempts = 1
lbrynet.dht.constants.rpcTimeout = 1
self.node.ping = fake_ping
# Make sure the contact was added
self.failIf(dead_contact not in self.node.contacts,
'Contact not added to fake node (error in test code)')
self.node.start_listening()
# Run the PING RPC (which should raise a timeout error)
df = self.node._protocol.sendRPC(dead_contact, 'ping', {})
def check_timeout(err):
self.assertEqual(err.type, TimeoutError)
df.addErrback(check_timeout)
def reset_values():
self.node.ping = real_ping
lbrynet.dht.constants.rpcTimeout = real_timeout
lbrynet.dht.constants.rpcAttempts = real_attempts
# See if the contact was removed due to the timeout
def check_removed_contact():
self.failIf(dead_contact in self.node.contacts,
'Contact was not removed after RPC timeout; check exception types.')
df.addCallback(lambda _: reset_values())
# Stop the reactor if a result arrives (timeout or not)
df.addCallback(lambda _: check_removed_contact())
self._reactor.pump([1 for _ in range(20)])
def testRPCRequest(self):
""" Tests if a valid RPC request is executed and responded to correctly """
remote_node = Node(node_id='2' * 48, udpPort=self.udpPort, externalIP="127.0.0.2", listenUDP=listenUDP,
resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater)
remote_node.start_listening()
remoteContact = remote_node.contact_manager.make_contact('2' * 48, '127.0.0.2', 9182, self.node._protocol)
self.node.addContact(remoteContact)
self.error = None
def handleError(f):
self.error = 'An RPC error occurred: %s' % f.getErrorMessage()
def handleResult(result):
expectedResult = 'pong'
if result != expectedResult:
self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' \
% (expectedResult, result)
# Publish the "local" node on the network
self.node.start_listening()
# Simulate the RPC
df = remoteContact.ping()
df.addCallback(handleResult)
df.addErrback(handleError)
for _ in range(10):
self._reactor.advance(1)
self.failIf(self.error, self.error)
# The list of sent RPC messages should be empty at this stage
self.failUnlessEqual(len(self.node._protocol._sentMessages), 0,
'The protocol is still waiting for a RPC result, '
'but the transaction is already done!')
def testRPCAccess(self):
""" Tests invalid RPC requests
Verifies that a RPC request for an existing but unpublished
method is denied, and that the associated (remote) exception gets
raised locally """
remote_node = Node(node_id='2' * 48, udpPort=self.udpPort, externalIP="127.0.0.2", listenUDP=listenUDP,
resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater)
remote_node.start_listening()
remote_contact = remote_node.contact_manager.make_contact('2' * 48, '127.0.0.2', 9182, self.node._protocol)
self.node.addContact(remote_contact)
self.error = None
def handleError(f):
try:
f.raiseException()
except AttributeError, e:
# This is the expected outcome since the remote node did not publish the method
self.error = None
except Exception, e:
self.error = 'The remote method failed, but the wrong exception was raised; ' \
'expected AttributeError, got %s' % type(e)
def handleResult(result):
self.error = 'The remote method executed successfully, returning: "%s"; ' \
'this RPC should not have been allowed.' % result
self.node.start_listening()
self._reactor.pump([1 for _ in range(10)])
# Simulate the RPC
df = remote_contact.not_a_rpc_function()
df.addCallback(handleResult)
df.addErrback(handleError)
self._reactor.pump([1 for _ in range(10)])
self.failIf(self.error, self.error)
# The list of sent RPC messages should be empty at this stage
self.failUnlessEqual(len(self.node._protocol._sentMessages), 0,
'The protocol is still waiting for a RPC result, '
'but the transaction is already done!')
def testRPCRequestArgs(self):
""" Tests if an RPC requiring arguments is executed correctly """
remote_node = Node(node_id='2' * 48, udpPort=self.udpPort, externalIP="127.0.0.2", listenUDP=listenUDP,
resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater)
remote_node.start_listening()
remote_contact = remote_node.contact_manager.make_contact('2' * 48, '127.0.0.2', 9182, self.node._protocol)
self.node.addContact(remote_contact)
self.error = None
def handleError(f):
self.error = 'An RPC error occurred: %s' % f.getErrorMessage()
def handleResult(result):
expectedResult = 'pong'
if result != expectedResult:
self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' % \
(expectedResult, result)
# Publish the "local" node on the network
self.node.start_listening()
# Simulate the RPC
df = remote_contact.ping()
df.addCallback(handleResult)
df.addErrback(handleError)
self._reactor.pump([1 for _ in range(10)])
self.failIf(self.error, self.error)
# The list of sent RPC messages should be empty at this stage
self.failUnlessEqual(len(self.node._protocol._sentMessages), 0,
'The protocol is still waiting for a RPC result, '
'but the transaction is already done!')

View file

@ -1,274 +0,0 @@
import time
import logging
from twisted.trial import unittest
from twisted.internet import defer, threads, task
from lbrynet.dht.node import Node
from lbrynet.tests import mocks
from lbrynet.core.utils import generate_id
log = logging.getLogger("lbrynet.tests.util")
# log.addHandler(logging.StreamHandler())
# log.setLevel(logging.DEBUG)
class TestKademliaBase(unittest.TestCase):
timeout = 300.0 # timeout for each test
network_size = 0 # plus lbrynet1, lbrynet2, and lbrynet3 seed nodes
node_ids = None
seed_dns = mocks.MOCK_DHT_SEED_DNS
def _add_next_node(self):
node_id, node_ip = self.mock_node_generator.next()
node = Node(node_id=node_id.decode('hex'), udpPort=4444, peerPort=3333, externalIP=node_ip,
resolve=mocks.resolve, listenUDP=mocks.listenUDP, callLater=self.clock.callLater, clock=self.clock)
self.nodes.append(node)
return node
@defer.inlineCallbacks
def add_node(self):
node = self._add_next_node()
yield node.joinNetwork(
[
("lbrynet1.lbry.io", self._seeds[0].port),
("lbrynet2.lbry.io", self._seeds[1].port),
("lbrynet3.lbry.io", self._seeds[2].port),
]
)
defer.returnValue(node)
def get_node(self, node_id):
for node in self.nodes:
if node.node_id == node_id:
return node
raise KeyError(node_id)
@defer.inlineCallbacks
def pop_node(self):
node = self.nodes.pop()
yield node.stop()
def pump_clock(self, n, step=0.01):
"""
:param n: seconds to run the reactor for
:param step: reactor tick rate (in seconds)
"""
for _ in range(n * 100):
self.clock.advance(step)
def run_reactor(self, seconds, *deferreds):
dl = [threads.deferToThread(self.pump_clock, seconds)]
for d in deferreds:
dl.append(d)
return defer.DeferredList(dl)
@defer.inlineCallbacks
def setUp(self):
self.nodes = []
self._seeds = []
self.clock = task.Clock()
self.mock_node_generator = mocks.mock_node_generator(mock_node_ids=self.node_ids)
join_dl = []
for seed_dns in self.seed_dns:
other_seeds = list(self.seed_dns.keys())
other_seeds.remove(seed_dns)
self._add_next_node()
seed = self.nodes.pop()
self._seeds.append(seed)
join_dl.append(
seed.joinNetwork([(other_seed_dns, 4444) for other_seed_dns in other_seeds])
)
if self.network_size:
for _ in range(self.network_size):
join_dl.append(self.add_node())
yield self.run_reactor(1, *tuple(join_dl))
self.verify_all_nodes_are_routable()
@defer.inlineCallbacks
def tearDown(self):
dl = []
while self.nodes:
dl.append(self.pop_node()) # stop all of the nodes
while self._seeds:
dl.append(self._seeds.pop().stop()) # and the seeds
yield defer.DeferredList(dl)
def verify_all_nodes_are_routable(self):
routable = set()
node_addresses = {node.externalIP for node in self.nodes}
node_addresses = node_addresses.union({node.externalIP for node in self._seeds})
for node in self._seeds:
contact_addresses = {contact.address for contact in node.contacts}
routable.update(contact_addresses)
for node in self.nodes:
contact_addresses = {contact.address for contact in node.contacts}
routable.update(contact_addresses)
self.assertSetEqual(routable, node_addresses)
@defer.inlineCallbacks
def verify_all_nodes_are_pingable(self):
ping_replies = {}
ping_dl = []
contacted = set()
def _ping_cb(result, node, replies):
replies[node] = result
for node in self._seeds:
contact_addresses = set()
for contact in node.contacts:
contact_addresses.add(contact.address)
d = contact.ping()
d.addCallback(_ping_cb, contact.address, ping_replies)
contacted.add(contact.address)
ping_dl.append(d)
for node in self.nodes:
contact_addresses = set()
for contact in node.contacts:
contact_addresses.add(contact.address)
d = contact.ping()
d.addCallback(_ping_cb, contact.address, ping_replies)
contacted.add(contact.address)
ping_dl.append(d)
self.run_reactor(2, *ping_dl)
yield threads.deferToThread(time.sleep, 0.1)
node_addresses = {node.externalIP for node in self.nodes}.union({seed.externalIP for seed in self._seeds})
self.assertSetEqual(node_addresses, contacted)
self.assertDictEqual(ping_replies, {node: "pong" for node in contacted})
class TestKademliaBootstrap(TestKademliaBase):
"""
Test initializing the network / connecting the seed nodes
"""
def test_bootstrap_network(self): # simulates the real network, which has three seeds
self.assertEqual(len(self._seeds[0].contacts), 2)
self.assertEqual(len(self._seeds[1].contacts), 2)
self.assertEqual(len(self._seeds[2].contacts), 2)
self.assertSetEqual(
{self._seeds[0].contacts[0].address, self._seeds[0].contacts[1].address},
{self._seeds[1].externalIP, self._seeds[2].externalIP}
)
self.assertSetEqual(
{self._seeds[1].contacts[0].address, self._seeds[1].contacts[1].address},
{self._seeds[0].externalIP, self._seeds[2].externalIP}
)
self.assertSetEqual(
{self._seeds[2].contacts[0].address, self._seeds[2].contacts[1].address},
{self._seeds[0].externalIP, self._seeds[1].externalIP}
)
def test_all_nodes_are_pingable(self):
return self.verify_all_nodes_are_pingable()
class TestKademliaBootstrapSixteenSeeds(TestKademliaBase):
node_ids = [
'000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000',
'111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111',
'222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222',
'333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333',
'444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444',
'555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555',
'666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666',
'777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777',
'888888888888888888888888888888888888888888888888888888888888888888888888888888888888888888888888',
'999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999',
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa',
'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb',
'cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc',
'dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd',
'eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee',
'ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff'
]
@defer.inlineCallbacks
def setUp(self):
self.seed_dns.update(
{
"lbrynet4.lbry.io": "10.42.42.4",
"lbrynet5.lbry.io": "10.42.42.5",
"lbrynet6.lbry.io": "10.42.42.6",
"lbrynet7.lbry.io": "10.42.42.7",
"lbrynet8.lbry.io": "10.42.42.8",
"lbrynet9.lbry.io": "10.42.42.9",
"lbrynet10.lbry.io": "10.42.42.10",
"lbrynet11.lbry.io": "10.42.42.11",
"lbrynet12.lbry.io": "10.42.42.12",
"lbrynet13.lbry.io": "10.42.42.13",
"lbrynet14.lbry.io": "10.42.42.14",
"lbrynet15.lbry.io": "10.42.42.15",
"lbrynet16.lbry.io": "10.42.42.16",
}
)
yield TestKademliaBase.setUp(self)
@defer.inlineCallbacks
def tearDown(self):
yield TestKademliaBase.tearDown(self)
def test_bootstrap_network(self):
pass
def _test_all_nodes_are_pingable(self):
return self.verify_all_nodes_are_pingable()
class Test250NodeNetwork(TestKademliaBase):
network_size = 250
def test_setup_network_and_verify_connectivity(self):
pass
def update_network(self):
import random
dl = []
announced_blobs = []
for node in self.nodes: # random events
if random.randint(0, 10000) < 75 and announced_blobs: # get peers for a blob
log.info('find blob')
blob_hash = random.choice(announced_blobs)
dl.append(node.getPeersForBlob(blob_hash))
if random.randint(0, 10000) < 25: # announce a blob
log.info('announce blob')
blob_hash = generate_id()
announced_blobs.append((blob_hash, node.node_id))
dl.append(node.announceHaveBlob(blob_hash))
random.shuffle(self.nodes)
# kill nodes
while random.randint(0, 100) > 95:
dl.append(self.pop_node())
log.info('pop node')
# add nodes
while random.randint(0, 100) > 95:
dl.append(self.add_node())
log.info('add node')
return tuple(dl), announced_blobs
@defer.inlineCallbacks
def _test_simulate_network(self):
total_blobs = []
for i in range(100):
d, blobs = self.update_network()
total_blobs.extend(blobs)
self.run_reactor(1, *d)
yield threads.deferToThread(time.sleep, 0.1)
routable = set()
node_addresses = {node.externalIP for node in self.nodes}
for node in self.nodes:
contact_addresses = {contact.address for contact in node.contacts}
routable.update(contact_addresses)
log.warning("difference: %i", len(node_addresses.difference(routable)))
log.info("blobs %i", len(total_blobs))
log.info("step %i, %i nodes", i, len(self.nodes))
self.pump_clock(100)

View file

@ -1,21 +1,18 @@
import base64
import struct
import io
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from twisted.internet import defer, error
from twisted.internet import defer
from twisted.python.failure import Failure
from lbrynet.core.client.ClientRequest import ClientRequest
from lbrynet.core.Error import RequestCanceledError
from lbrynet.core import BlobAvailability
from lbrynet.core.utils import generate_id
from lbrynet.dht.node import Node as RealNode
from lbrynet.daemon import ExchangeRateManager as ERM
from lbrynet import conf
from util import debug_kademlia_packet
KB = 2**10
PUBLIC_EXPONENT = 65537 # http://www.daemonology.net/blog/2009-06-11-cryptographic-right-answers.html
@ -41,6 +38,9 @@ class Node(RealNode):
def stop(self):
return defer.succeed(None)
def start(self, known_node_addresses=None):
return self.joinNetwork(known_node_addresses)
class FakeNetwork(object):
@staticmethod
@ -188,9 +188,15 @@ class Wallet(object):
def get_info_exchanger(self):
return PointTraderKeyExchanger(self)
def update_peer_address(self, peer, address):
pass
def get_wallet_info_query_handler_factory(self):
return PointTraderKeyQueryHandlerFactory(self)
def get_unused_address_for_peer(self, peer):
return defer.succeed("bDtL6qriyimxz71DSYjojTBsm6cpM1bqmj")
def reserve_points(self, *args):
return True
@ -250,18 +256,12 @@ class Announcer(object):
def immediate_announce(self, *args):
pass
def run_manage_loop(self):
pass
def start(self):
pass
def stop(self):
pass
def get_next_announce_time(self):
return 0
class GenFile(io.RawIOBase):
def __init__(self, size, pattern):
@ -410,89 +410,3 @@ def mock_conf_settings(obj, settings={}):
conf.settings = original_settings
obj.addCleanup(_reset_settings)
MOCK_DHT_NODES = [
"000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
"DEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF",
]
MOCK_DHT_SEED_DNS = { # these map to mock nodes 0, 1, and 2
"lbrynet1.lbry.io": "10.42.42.1",
"lbrynet2.lbry.io": "10.42.42.2",
"lbrynet3.lbry.io": "10.42.42.3",
}
def resolve(name, timeout=(1, 3, 11, 45)):
if name not in MOCK_DHT_SEED_DNS:
return defer.fail(error.DNSLookupError(name))
return defer.succeed(MOCK_DHT_SEED_DNS[name])
class MockUDPTransport(object):
def __init__(self, address, port, max_packet_size, protocol):
self.address = address
self.port = port
self.max_packet_size = max_packet_size
self._node = protocol._node
def write(self, data, address):
dest = MockNetwork.peers[address][0]
debug_kademlia_packet(data, (self.address, self.port), address, self._node)
dest.datagramReceived(data, (self.address, self.port))
class MockUDPPort(object):
def __init__(self, protocol):
self.protocol = protocol
def startListening(self, reason=None):
return self.protocol.startProtocol()
def stopListening(self, reason=None):
return self.protocol.stopProtocol()
class MockNetwork(object):
peers = {} # (interface, port): (protocol, max_packet_size)
@classmethod
def add_peer(cls, port, protocol, interface, maxPacketSize):
interface = protocol._node.externalIP
protocol.transport = MockUDPTransport(interface, port, maxPacketSize, protocol)
cls.peers[(interface, port)] = (protocol, maxPacketSize)
def listenUDP(port, protocol, interface='', maxPacketSize=8192):
MockNetwork.add_peer(port, protocol, interface, maxPacketSize)
return MockUDPPort(protocol)
def address_generator(address=(10, 42, 42, 1)):
def increment(addr):
value = struct.unpack("I", "".join([chr(x) for x in list(addr)[::-1]]))[0] + 1
new_addr = []
for i in range(4):
new_addr.append(value % 256)
value >>= 8
return tuple(new_addr[::-1])
while True:
yield "{}.{}.{}.{}".format(*address)
address = increment(address)
def mock_node_generator(count=None, mock_node_ids=MOCK_DHT_NODES):
if mock_node_ids is None:
mock_node_ids = MOCK_DHT_NODES
for num, node_ip in enumerate(address_generator()):
if count and num >= count:
break
if num >= len(mock_node_ids):
node_id = generate_id().encode('hex')
else:
node_id = mock_node_ids[num]
yield (node_id, node_ip)

View file

@ -1,82 +1,55 @@
import tempfile
import shutil
from twisted.trial import unittest
from twisted.internet import defer, reactor, threads
from twisted.internet import defer, task
from lbrynet.core import utils
from lbrynet.tests.util import random_lbry_hash
from lbrynet.dht.hashannouncer import DHTHashAnnouncer
from lbrynet.core.call_later_manager import CallLaterManager
from lbrynet.database.storage import SQLiteStorage
class MocDHTNode(object):
def __init__(self, announce_will_fail=False):
# if announce_will_fail is True,
# announceHaveBlob will return empty dict
self.call_later_manager = CallLaterManager
self.call_later_manager.setup(reactor.callLater)
def __init__(self):
self.blobs_announced = 0
self.announce_will_fail = announce_will_fail
def announceHaveBlob(self, blob):
if self.announce_will_fail:
return_val = {}
else:
return_val = {blob: ["ab"*48]}
self.blobs_announced += 1
d = defer.Deferred()
self.call_later_manager.call_later(1, d.callback, return_val)
return d
return defer.succeed(True)
class MocSupplier(object):
def __init__(self, blobs_to_announce):
self.blobs_to_announce = blobs_to_announce
self.announced = False
def hashes_to_announce(self):
if not self.announced:
self.announced = True
return defer.succeed(self.blobs_to_announce)
else:
return defer.succeed([])
class DHTHashAnnouncerTest(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
from lbrynet.conf import initialize_settings
initialize_settings(False)
self.num_blobs = 10
self.blobs_to_announce = []
for i in range(0, self.num_blobs):
self.blobs_to_announce.append(random_lbry_hash())
self.clock = task.Clock()
self.dht_node = MocDHTNode()
self.dht_node.peerPort = 3333
self.dht_node.clock = reactor
self.db_dir = tempfile.mkdtemp()
self.storage = SQLiteStorage(self.db_dir)
yield self.storage.setup()
self.announcer = DHTHashAnnouncer(self.dht_node, self.storage, 10)
for blob_hash in self.blobs_to_announce:
yield self.storage.add_completed_blob(blob_hash, 100, 0, 1)
utils.call_later = self.clock.callLater
from lbrynet.core.server.DHTHashAnnouncer import DHTHashAnnouncer
self.announcer = DHTHashAnnouncer(self.dht_node, peer_port=3333)
self.supplier = MocSupplier(self.blobs_to_announce)
self.announcer.add_supplier(self.supplier)
@defer.inlineCallbacks
def tearDown(self):
self.dht_node.call_later_manager.stop()
yield self.storage.stop()
yield threads.deferToThread(shutil.rmtree, self.db_dir)
@defer.inlineCallbacks
def test_announce_fail(self):
# test what happens when node.announceHaveBlob() returns empty dict
self.dht_node.announce_will_fail = True
d = yield self.announcer.manage()
yield d
@defer.inlineCallbacks
def test_basic(self):
d = self.announcer.immediate_announce(self.blobs_to_announce)
self.assertEqual(len(self.announcer.hash_queue), self.num_blobs)
yield d
self.announcer._announce_available_hashes()
self.assertEqual(self.announcer.hash_queue_size(), self.announcer.CONCURRENT_ANNOUNCERS)
self.clock.advance(1)
self.assertEqual(self.dht_node.blobs_announced, self.num_blobs)
self.assertEqual(len(self.announcer.hash_queue), 0)
self.assertEqual(self.announcer.hash_queue_size(), 0)
@defer.inlineCallbacks
def test_immediate_announce(self):
# Test that immediate announce puts a hash at the front of the queue
d = self.announcer.immediate_announce(self.blobs_to_announce)
self.assertEqual(len(self.announcer.hash_queue), self.num_blobs)
self.announcer._announce_available_hashes()
blob_hash = random_lbry_hash()
self.announcer.immediate_announce([blob_hash])
self.assertEqual(len(self.announcer.hash_queue), self.num_blobs+1)
self.assertEqual(blob_hash, self.announcer.hash_queue[-1])
yield d
self.assertEqual(self.announcer.hash_queue_size(), self.announcer.CONCURRENT_ANNOUNCERS+1)
self.assertEqual(blob_hash, self.announcer.hash_queue[0][0])

View file

@ -1,16 +1,24 @@
import unittest
from lbrynet.dht import contact
from twisted.internet import task
from twisted.trial import unittest
from lbrynet.core.utils import generate_id
from lbrynet.dht.contact import ContactManager
from lbrynet.dht import constants
class ContactOperatorsTest(unittest.TestCase):
""" Basic tests case for boolean operators on the Contact class """
def setUp(self):
self.firstContact = contact.Contact('firstContactID', '127.0.0.1', 1000, None, 1)
self.secondContact = contact.Contact('2ndContactID', '192.168.0.1', 1000, None, 32)
self.secondContactCopy = contact.Contact('2ndContactID', '192.168.0.1', 1000, None, 32)
self.firstContactDifferentValues = contact.Contact(
'firstContactID', '192.168.1.20', 1000, None, 50)
self.contact_manager = ContactManager()
self.node_ids = [generate_id(), generate_id(), generate_id()]
self.firstContact = self.contact_manager.make_contact(self.node_ids[1], '127.0.0.1', 1000, None, 1)
self.secondContact = self.contact_manager.make_contact(self.node_ids[0], '192.168.0.1', 1000, None, 32)
self.secondContactCopy = self.contact_manager.make_contact(self.node_ids[0], '192.168.0.1', 1000, None, 32)
self.firstContactDifferentValues = self.contact_manager.make_contact(self.node_ids[1], '192.168.1.20',
1000, None, 50)
def testNoDuplicateContactObjects(self):
self.assertTrue(self.secondContact is self.secondContactCopy)
self.assertTrue(self.firstContact is not self.firstContactDifferentValues)
def testBoolean(self):
""" Test "equals" and "not equals" comparisons """
@ -24,15 +32,6 @@ class ContactOperatorsTest(unittest.TestCase):
self.secondContact, self.secondContactCopy,
'Different copies of the same Contact instance should be equal')
def testStringComparisons(self):
""" Test comparisons of Contact objects with str types """
self.failUnlessEqual(
'firstContactID', self.firstContact,
'The node ID string must be equal to the contact object')
self.failIfEqual(
'some random string', self.firstContact,
"The tested string should not be equal to the contact object (not equal to it's ID)")
def testIllogicalComparisons(self):
""" Test comparisons with non-Contact and non-str types """
msg = '"{}" operator: Contact object should not be equal to {} type'

View file

@ -4,19 +4,19 @@
# the GNU Lesser General Public License Version 3, or any later version.
# See the COPYING file included in this archive
import unittest
from twisted.trial import unittest
import time
import lbrynet.dht.datastore
import lbrynet.dht.constants
import hashlib
from lbrynet.dht.datastore import DictDataStore
from lbrynet.dht import constants
class DictDataStoreTest(unittest.TestCase):
""" Basic tests case for the reference DataStore API and implementation """
def setUp(self):
self.ds = lbrynet.dht.datastore.DictDataStore()
h = hashlib.sha1()
self.ds = DictDataStore()
h = hashlib.sha384()
h.update('g')
hashKey = h.digest()
h2 = hashlib.sha1()
@ -78,7 +78,7 @@ class DictDataStoreTest(unittest.TestCase):
h2 = hashlib.sha1()
h2.update('test2')
key2 = h2.digest()
td = lbrynet.dht.constants.dataExpireTimeout - 100
td = constants.dataExpireTimeout - 100
td2 = td + td
self.ds.addPeerToBlob(h1, 'val1', now - td, now - td, '1')
self.ds.addPeerToBlob(h1, 'val2', now - td2, now - td2, '2')
@ -128,16 +128,3 @@ class DictDataStoreTest(unittest.TestCase):
#
# # Read back the meta-data
# for key, value in self.cases:
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(DictDataStoreTest))
return suite
if __name__ == '__main__':
# If this module is executed from the commandline, run all its tests
unittest.TextTestRunner().run(suite())

View file

@ -4,10 +4,10 @@
# the GNU Lesser General Public License Version 3, or any later version.
# See the COPYING file included in this archive
import unittest
from twisted.trial import unittest
import lbrynet.dht.encoding
class BencodeTest(unittest.TestCase):
""" Basic tests case for the Bencode implementation """
def setUp(self):
@ -16,7 +16,7 @@ class BencodeTest(unittest.TestCase):
self.cases = ((42, 'i42e'),
('spam', '4:spam'),
(['spam', 42], 'l4:spami42ee'),
({'foo':42, 'bar':'spam'}, 'd3:bar4:spam3:fooi42ee'),
({'foo': 42, 'bar': 'spam'}, 'd3:bar4:spam3:fooi42ee'),
# ...and now the "real life" tests
([['abc', '127.0.0.1', 1919], ['def', '127.0.0.1', 1921]],
'll3:abc9:127.0.0.1i1919eel3:def9:127.0.0.1i1921eee'))
@ -45,12 +45,3 @@ class BencodeTest(unittest.TestCase):
for encodedValue in self.badDecoderCases:
self.failUnlessRaises(
lbrynet.dht.encoding.DecodeError, self.encoding.decode, encodedValue)
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(BencodeTest))
return suite
if __name__ == '__main__':
# If this module is executed from the commandline, run all its tests
unittest.TextTestRunner().run(suite())

View file

@ -4,23 +4,41 @@
# the GNU Lesser General Public License Version 3, or any later version.
# See the COPYING file included in this archive
import unittest
from twisted.trial import unittest
import struct
from lbrynet.core.utils import generate_id
from lbrynet.dht import kbucket
import lbrynet.dht.contact as contact
from lbrynet.dht.contact import ContactManager
from lbrynet.dht import constants
def address_generator(address=(10, 42, 42, 1)):
def increment(addr):
value = struct.unpack("I", "".join([chr(x) for x in list(addr)[::-1]]))[0] + 1
new_addr = []
for i in range(4):
new_addr.append(value % 256)
value >>= 8
return tuple(new_addr[::-1])
while True:
yield "{}.{}.{}.{}".format(*address)
address = increment(address)
class KBucketTest(unittest.TestCase):
""" Test case for the KBucket class """
def setUp(self):
self.kbucket = kbucket.KBucket(0, 2**160)
self.address_generator = address_generator()
self.contact_manager = ContactManager()
self.kbucket = kbucket.KBucket(0, 2**constants.key_bits, generate_id())
def testAddContact(self):
""" Tests if the bucket handles contact additions/updates correctly """
# Test if contacts can be added to empty list
# Add k contacts to bucket
for i in range(constants.k):
tmpContact = contact.Contact('tempContactID%d' % i, str(i), i, i)
tmpContact = self.contact_manager.make_contact(generate_id(), next(self.address_generator), 4444, 0, None)
self.kbucket.addContact(tmpContact)
self.failUnlessEqual(
self.kbucket._contacts[i],
@ -28,8 +46,7 @@ class KBucketTest(unittest.TestCase):
"Contact in position %d not the same as the newly-added contact" % i)
# Test if contact is not added to full list
i += 1
tmpContact = contact.Contact('tempContactID%d' % i, str(i), i, i)
tmpContact = self.contact_manager.make_contact(generate_id(), next(self.address_generator), 4444, 0, None)
self.failUnlessRaises(kbucket.BucketFull, self.kbucket.addContact, tmpContact)
# Test if an existing contact is updated correctly if added again
@ -48,14 +65,17 @@ class KBucketTest(unittest.TestCase):
# Add k-2 contacts
node_ids = []
if constants.k >= 2:
for i in range(constants.k-2):
tmpContact = contact.Contact(i, i, i, i)
node_ids.append(generate_id())
tmpContact = self.contact_manager.make_contact(node_ids[-1], next(self.address_generator), 4444, 0, None)
self.kbucket.addContact(tmpContact)
else:
# add k contacts
for i in range(constants.k):
tmpContact = contact.Contact(i, i, i, i)
node_ids.append(generate_id())
tmpContact = self.contact_manager.make_contact(node_ids[-1], next(self.address_generator), 4444, 0, None)
self.kbucket.addContact(tmpContact)
# try to get too many contacts
@ -65,8 +85,8 @@ class KBucketTest(unittest.TestCase):
'Returned list should not have more than k entries!')
# verify returned contacts in list
for i in range(constants.k-2):
self.failIf(self.kbucket._contacts[i].id != i,
for node_id, i in zip(node_ids, range(constants.k-2)):
self.failIf(self.kbucket._contacts[i].id != node_id,
"Contact in position %s not same as added contact" % (str(i)))
# try to get too many contacts
@ -89,25 +109,15 @@ class KBucketTest(unittest.TestCase):
def testRemoveContact(self):
# try remove contact from empty list
rmContact = contact.Contact('TestContactID1', '127.0.0.1', 1, 1)
rmContact = self.contact_manager.make_contact(generate_id(), next(self.address_generator), 4444, 0, None)
self.failUnlessRaises(ValueError, self.kbucket.removeContact, rmContact)
# Add couple contacts
for i in range(constants.k-2):
tmpContact = contact.Contact('tmpTestContactID%d' % i, str(i), i, i)
tmpContact = self.contact_manager.make_contact(generate_id(), next(self.address_generator), 4444, 0, None)
self.kbucket.addContact(tmpContact)
# try remove contact from empty list
self.kbucket.addContact(rmContact)
result = self.kbucket.removeContact(rmContact)
self.failIf(rmContact in self.kbucket._contacts, "Could not remove contact from bucket")
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(KBucketTest))
return suite
if __name__ == '__main__':
# If this module is executed from the commandline, run all its tests
unittest.TextTestRunner().run(suite())

View file

@ -4,7 +4,7 @@
# the GNU Lesser General Public License Version 3, or any later version.
# See the COPYING file included in this archive
import unittest
from twisted.trial import unittest
from lbrynet.dht.msgtypes import RequestMessage, ResponseMessage, ErrorMessage
from lbrynet.dht.msgformat import MessageTranslator, DefaultFormat

View file

@ -5,20 +5,21 @@
# See the COPYING file included in this archive
import hashlib
import unittest
from twisted.trial import unittest
import struct
from twisted.internet import protocol, defer, selectreactor
from lbrynet.dht.msgtypes import ResponseMessage
import lbrynet.dht.node
import lbrynet.dht.constants
import lbrynet.dht.datastore
from lbrynet.dht.node import Node
from lbrynet.dht import constants
from lbrynet.dht.datastore import DictDataStore
from lbrynet.dht.routingtable import TreeRoutingTable
class NodeIDTest(unittest.TestCase):
""" Test case for the Node class's ID """
def setUp(self):
self.node = lbrynet.dht.node.Node()
self.node = Node()
def testAutoCreatedID(self):
""" Tests if a new node has a valid node ID """
@ -49,12 +50,10 @@ class NodeIDTest(unittest.TestCase):
class NodeDataTest(unittest.TestCase):
""" Test case for the Node class's data-related functions """
def setUp(self):
import lbrynet.dht.contact
h = hashlib.sha384()
h.update('test')
self.node = lbrynet.dht.node.Node()
self.contact = lbrynet.dht.contact.Contact(h.digest(), '127.0.0.1', 12345,
self.node._protocol)
self.node = Node()
self.contact = self.node.contact_manager.make_contact(h.digest(), '127.0.0.1', 12345, self.node._protocol)
self.token = self.node.make_token(self.contact.compact_ip())
self.cases = []
for i in xrange(5):
@ -65,13 +64,8 @@ class NodeDataTest(unittest.TestCase):
@defer.inlineCallbacks
def testStore(self):
""" Tests if the node can store (and privately retrieve) some data """
for key, value in self.cases:
request = {
'port': value,
'lbryid': self.contact.id,
'token': self.token
}
yield self.node.store(key, request, self.contact.id, _rpcNodeContact=self.contact)
for key, port in self.cases:
yield self.node.store(self.contact, key, self.token, port, self.contact.id)
for key, value in self.cases:
expected_result = self.contact.compact_ip() + str(struct.pack('>H', value)) + \
self.contact.id
@ -85,189 +79,185 @@ class NodeDataTest(unittest.TestCase):
class NodeContactTest(unittest.TestCase):
""" Test case for the Node class's contact management-related functions """
def setUp(self):
self.node = lbrynet.dht.node.Node()
self.node = Node()
@defer.inlineCallbacks
def testAddContact(self):
""" Tests if a contact can be added and retrieved correctly """
import lbrynet.dht.contact
# Create the contact
h = hashlib.sha384()
h.update('node1')
contactID = h.digest()
contact = lbrynet.dht.contact.Contact(contactID, '127.0.0.1', 91824, self.node._protocol)
contact = self.node.contact_manager.make_contact(contactID, '127.0.0.1', 91824, self.node._protocol)
# Now add it...
self.node.addContact(contact)
yield self.node.addContact(contact)
# ...and request the closest nodes to it using FIND_NODE
closestNodes = self.node._routingTable.findCloseNodes(contactID, lbrynet.dht.constants.k)
closestNodes = self.node._routingTable.findCloseNodes(contactID, constants.k)
self.failUnlessEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; '
'expected 1, got %d' % len(closestNodes))
self.failUnless(contact in closestNodes, 'Added contact not found by issueing '
'_findCloseNodes()')
@defer.inlineCallbacks
def testAddSelfAsContact(self):
""" Tests the node's behaviour when attempting to add itself as a contact """
import lbrynet.dht.contact
# Create a contact with the same ID as the local node's ID
contact = lbrynet.dht.contact.Contact(self.node.node_id, '127.0.0.1', 91824, None)
contact = self.node.contact_manager.make_contact(self.node.node_id, '127.0.0.1', 91824, None)
# Now try to add it
self.node.addContact(contact)
yield self.node.addContact(contact)
# ...and request the closest nodes to it using FIND_NODE
closestNodes = self.node._routingTable.findCloseNodes(self.node.node_id,
lbrynet.dht.constants.k)
constants.k)
self.failIf(contact in closestNodes, 'Node added itself as a contact')
class FakeRPCProtocol(protocol.DatagramProtocol):
def __init__(self):
self.reactor = selectreactor.SelectReactor()
self.testResponse = None
self.network = None
def createNetwork(self, contactNetwork):
"""
set up a list of contacts together with their closest contacts
@param contactNetwork: a sequence of tuples, each containing a contact together with its
closest contacts: C{(<contact>, <closest contact 1, ...,closest contact n>)}
"""
self.network = contactNetwork
def sendRPC(self, contact, method, args, rawResponse=False):
""" Fake RPC protocol; allows entangled.kademlia.contact.Contact objects to "send" RPCs"""
h = hashlib.sha384()
h.update('rpcId')
rpc_id = h.digest()[:20]
if method == "findNode":
# get the specific contacts closest contacts
closestContacts = []
closestContactsList = []
for contactTuple in self.network:
if contact == contactTuple[0]:
# get the list of closest contacts for this contact
closestContactsList = contactTuple[1]
# Pack the closest contacts into a ResponseMessage
for closeContact in closestContactsList:
closestContacts.append((closeContact.id, closeContact.address, closeContact.port))
message = ResponseMessage(rpc_id, contact.id, closestContacts)
df = defer.Deferred()
df.callback((message, (contact.address, contact.port)))
return df
elif method == "findValue":
for contactTuple in self.network:
if contact == contactTuple[0]:
# Get the data stored by this remote contact
dataDict = contactTuple[2]
dataKey = dataDict.keys()[0]
data = dataDict.get(dataKey)
# Check if this contact has the requested value
if dataKey == args[0]:
# Return the data value
response = dataDict
print "data found at contact: " + contact.id
else:
# Return the closest contact to the requested data key
print "data not found at contact: " + contact.id
closeContacts = contactTuple[1]
closestContacts = []
for closeContact in closeContacts:
closestContacts.append((closeContact.id, closeContact.address,
closeContact.port))
response = closestContacts
# Create the response message
message = ResponseMessage(rpc_id, contact.id, response)
df = defer.Deferred()
df.callback((message, (contact.address, contact.port)))
return df
def _send(self, data, rpcID, address):
""" fake sending data """
class NodeLookupTest(unittest.TestCase):
""" Test case for the Node class's iterativeFind node lookup algorithm """
def setUp(self):
# create a fake protocol to imitate communication with other nodes
self._protocol = FakeRPCProtocol()
# Note: The reactor is never started for this test. All deferred calls run sequentially,
# since there is no asynchronous network communication
# create the node to be tested in isolation
h = hashlib.sha384()
h.update('node1')
node_id = str(h.digest())
self.node = lbrynet.dht.node.Node(node_id=node_id, udpPort=4000, networkProtocol=self._protocol)
self.updPort = 81173
self.contactsAmount = 80
# Reinitialise the routing table
self.node._routingTable = lbrynet.dht.routingtable.OptimizedTreeRoutingTable(
self.node.node_id)
# create 160 bit node ID's for test purposes
self.testNodeIDs = []
idNum = int(self.node.node_id.encode('hex'), 16)
for i in range(self.contactsAmount):
# create the testNodeIDs in ascending order, away from the actual node ID,
# with regards to the distance metric
self.testNodeIDs.append(str("%X" % (idNum + i + 1)).decode('hex'))
# generate contacts
self.contacts = []
for i in range(self.contactsAmount):
contact = lbrynet.dht.contact.Contact(self.testNodeIDs[i], "127.0.0.1",
self.updPort + i + 1, self._protocol)
self.contacts.append(contact)
# create the network of contacts in format: (contact, closest contacts)
contactNetwork = ((self.contacts[0], self.contacts[8:15]),
(self.contacts[1], self.contacts[16:23]),
(self.contacts[2], self.contacts[24:31]),
(self.contacts[3], self.contacts[32:39]),
(self.contacts[4], self.contacts[40:47]),
(self.contacts[5], self.contacts[48:55]),
(self.contacts[6], self.contacts[56:63]),
(self.contacts[7], self.contacts[64:71]),
(self.contacts[8], self.contacts[72:79]),
(self.contacts[40], self.contacts[41:48]),
(self.contacts[41], self.contacts[41:48]),
(self.contacts[42], self.contacts[41:48]),
(self.contacts[43], self.contacts[41:48]),
(self.contacts[44], self.contacts[41:48]),
(self.contacts[45], self.contacts[41:48]),
(self.contacts[46], self.contacts[41:48]),
(self.contacts[47], self.contacts[41:48]),
(self.contacts[48], self.contacts[41:48]),
(self.contacts[50], self.contacts[0:7]),
(self.contacts[51], self.contacts[8:15]),
(self.contacts[52], self.contacts[16:23]))
contacts_with_datastores = []
for contact_tuple in contactNetwork:
contacts_with_datastores.append((contact_tuple[0], contact_tuple[1],
lbrynet.dht.datastore.DictDataStore()))
self._protocol.createNetwork(contacts_with_datastores)
@defer.inlineCallbacks
def testNodeBootStrap(self):
""" Test bootstrap with the closest possible contacts """
activeContacts = yield self.node._iterativeFind(self.node.node_id, self.contacts[0:8])
# Set the expected result
expectedResult = set()
for item in self.contacts[0:6]:
expectedResult.add(item.id)
# Get the result from the deferred
# Check the length of the active contacts
self.failUnlessEqual(activeContacts.__len__(), expectedResult.__len__(),
"More active contacts should exist, there should be %d "
"contacts but there are %d" % (len(expectedResult),
len(activeContacts)))
# Check that the received active contacts are the same as the input contacts
self.failUnlessEqual({contact.id for contact in activeContacts}, expectedResult,
"Active should only contain the closest possible contacts"
" which were used as input for the boostrap")
# class FakeRPCProtocol(protocol.DatagramProtocol):
# def __init__(self):
# self.reactor = selectreactor.SelectReactor()
# self.testResponse = None
# self.network = None
#
# def createNetwork(self, contactNetwork):
# """
# set up a list of contacts together with their closest contacts
# @param contactNetwork: a sequence of tuples, each containing a contact together with its
# closest contacts: C{(<contact>, <closest contact 1, ...,closest contact n>)}
# """
# self.network = contactNetwork
#
# def sendRPC(self, contact, method, args, rawResponse=False):
# """ Fake RPC protocol; allows entangled.kademlia.contact.Contact objects to "send" RPCs"""
#
# h = hashlib.sha384()
# h.update('rpcId')
# rpc_id = h.digest()[:20]
#
# if method == "findNode":
# # get the specific contacts closest contacts
# closestContacts = []
# closestContactsList = []
# for contactTuple in self.network:
# if contact == contactTuple[0]:
# # get the list of closest contacts for this contact
# closestContactsList = contactTuple[1]
# # Pack the closest contacts into a ResponseMessage
# for closeContact in closestContactsList:
# closestContacts.append((closeContact.id, closeContact.address, closeContact.port))
#
# message = ResponseMessage(rpc_id, contact.id, closestContacts)
# df = defer.Deferred()
# df.callback((message, (contact.address, contact.port)))
# return df
# elif method == "findValue":
# for contactTuple in self.network:
# if contact == contactTuple[0]:
# # Get the data stored by this remote contact
# dataDict = contactTuple[2]
# dataKey = dataDict.keys()[0]
# data = dataDict.get(dataKey)
# # Check if this contact has the requested value
# if dataKey == args[0]:
# # Return the data value
# response = dataDict
# print "data found at contact: " + contact.id
# else:
# # Return the closest contact to the requested data key
# print "data not found at contact: " + contact.id
# closeContacts = contactTuple[1]
# closestContacts = []
# for closeContact in closeContacts:
# closestContacts.append((closeContact.id, closeContact.address,
# closeContact.port))
# response = closestContacts
#
# # Create the response message
# message = ResponseMessage(rpc_id, contact.id, response)
# df = defer.Deferred()
# df.callback((message, (contact.address, contact.port)))
# return df
#
# def _send(self, data, rpcID, address):
# """ fake sending data """
#
#
# class NodeLookupTest(unittest.TestCase):
# """ Test case for the Node class's iterativeFind node lookup algorithm """
#
# def setUp(self):
# # create a fake protocol to imitate communication with other nodes
# self._protocol = FakeRPCProtocol()
# # Note: The reactor is never started for this test. All deferred calls run sequentially,
# # since there is no asynchronous network communication
# # create the node to be tested in isolation
# h = hashlib.sha384()
# h.update('node1')
# node_id = str(h.digest())
# self.node = Node(node_id, 4000, None, None, self._protocol)
# self.updPort = 81173
# self.contactsAmount = 80
# # Reinitialise the routing table
# self.node._routingTable = TreeRoutingTable(self.node.node_id)
#
# # create 160 bit node ID's for test purposes
# self.testNodeIDs = []
# idNum = int(self.node.node_id.encode('hex'), 16)
# for i in range(self.contactsAmount):
# # create the testNodeIDs in ascending order, away from the actual node ID,
# # with regards to the distance metric
# self.testNodeIDs.append(str("%X" % (idNum + i + 1)).decode('hex'))
#
# # generate contacts
# self.contacts = []
# for i in range(self.contactsAmount):
# contact = self.node.contact_manager.make_contact(self.testNodeIDs[i], "127.0.0.1",
# self.updPort + i + 1, self._protocol)
# self.contacts.append(contact)
#
# # create the network of contacts in format: (contact, closest contacts)
# contactNetwork = ((self.contacts[0], self.contacts[8:15]),
# (self.contacts[1], self.contacts[16:23]),
# (self.contacts[2], self.contacts[24:31]),
# (self.contacts[3], self.contacts[32:39]),
# (self.contacts[4], self.contacts[40:47]),
# (self.contacts[5], self.contacts[48:55]),
# (self.contacts[6], self.contacts[56:63]),
# (self.contacts[7], self.contacts[64:71]),
# (self.contacts[8], self.contacts[72:79]),
# (self.contacts[40], self.contacts[41:48]),
# (self.contacts[41], self.contacts[41:48]),
# (self.contacts[42], self.contacts[41:48]),
# (self.contacts[43], self.contacts[41:48]),
# (self.contacts[44], self.contacts[41:48]),
# (self.contacts[45], self.contacts[41:48]),
# (self.contacts[46], self.contacts[41:48]),
# (self.contacts[47], self.contacts[41:48]),
# (self.contacts[48], self.contacts[41:48]),
# (self.contacts[50], self.contacts[0:7]),
# (self.contacts[51], self.contacts[8:15]),
# (self.contacts[52], self.contacts[16:23]))
#
# contacts_with_datastores = []
#
# for contact_tuple in contactNetwork:
# contacts_with_datastores.append((contact_tuple[0], contact_tuple[1],
# DictDataStore()))
# self._protocol.createNetwork(contacts_with_datastores)
#
# # @defer.inlineCallbacks
# # def testNodeBootStrap(self):
# # """ Test bootstrap with the closest possible contacts """
# # # Set the expected result
# # expectedResult = {item.id for item in self.contacts[0:8]}
# #
# # activeContacts = yield self.node._iterativeFind(self.node.node_id, self.contacts[0:8])
# #
# # # Check the length of the active contacts
# # self.failUnlessEqual(activeContacts.__len__(), expectedResult.__len__(),
# # "More active contacts should exist, there should be %d "
# # "contacts but there are %d" % (len(expectedResult),
# # len(activeContacts)))
# #
# # # Check that the received active contacts are the same as the input contacts
# # self.failUnlessEqual({contact.id for contact in activeContacts}, expectedResult,
# # "Active should only contain the closest possible contacts"
# # " which were used as input for the boostrap")

View file

@ -1,200 +1,167 @@
import time
import unittest
from twisted.internet.task import Clock
from twisted.internet import defer
import lbrynet.dht.protocol
import lbrynet.dht.contact
import lbrynet.dht.constants
import lbrynet.dht.msgtypes
from lbrynet.dht.error import TimeoutError
from lbrynet.dht.node import Node, rpcmethod
from lbrynet.tests.mocks import listenUDP, resolve
from lbrynet.core.call_later_manager import CallLaterManager
import logging
log = logging.getLogger()
class KademliaProtocolTest(unittest.TestCase):
""" Test case for the Protocol class """
udpPort = 9182
def setUp(self):
self._reactor = Clock()
CallLaterManager.setup(self._reactor.callLater)
self.node = Node(node_id='1' * 48, udpPort=self.udpPort, externalIP="127.0.0.1", listenUDP=listenUDP,
resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater)
def tearDown(self):
CallLaterManager.stop()
del self._reactor
@defer.inlineCallbacks
def testReactor(self):
""" Tests if the reactor can start/stop the protocol correctly """
d = defer.Deferred()
self._reactor.callLater(1, d.callback, True)
self._reactor.advance(1)
result = yield d
self.assertTrue(result)
def testRPCTimeout(self):
""" Tests if a RPC message sent to a dead remote node times out correctly """
dead_node = Node(node_id='2' * 48, udpPort=self.udpPort, externalIP="127.0.0.2", listenUDP=listenUDP,
resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater)
dead_node.start_listening()
dead_node.stop()
self._reactor.pump([1 for _ in range(10)])
dead_contact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.2', 9182, self.node._protocol)
self.node.addContact(dead_contact)
@rpcmethod
def fake_ping(*args, **kwargs):
time.sleep(lbrynet.dht.constants.rpcTimeout + 1)
return 'pong'
real_ping = self.node.ping
real_timeout = lbrynet.dht.constants.rpcTimeout
real_attempts = lbrynet.dht.constants.rpcAttempts
lbrynet.dht.constants.rpcAttempts = 1
lbrynet.dht.constants.rpcTimeout = 1
self.node.ping = fake_ping
# Make sure the contact was added
self.failIf(dead_contact not in self.node.contacts,
'Contact not added to fake node (error in test code)')
self.node.start_listening()
# Run the PING RPC (which should raise a timeout error)
df = self.node._protocol.sendRPC(dead_contact, 'ping', {})
def check_timeout(err):
self.assertEqual(err.type, TimeoutError)
df.addErrback(check_timeout)
def reset_values():
self.node.ping = real_ping
lbrynet.dht.constants.rpcTimeout = real_timeout
lbrynet.dht.constants.rpcAttempts = real_attempts
# See if the contact was removed due to the timeout
def check_removed_contact():
self.failIf(dead_contact in self.node.contacts,
'Contact was not removed after RPC timeout; check exception types.')
df.addCallback(lambda _: reset_values())
# Stop the reactor if a result arrives (timeout or not)
df.addCallback(lambda _: check_removed_contact())
self._reactor.pump([1 for _ in range(20)])
def testRPCRequest(self):
""" Tests if a valid RPC request is executed and responded to correctly """
remote_node = Node(node_id='2' * 48, udpPort=self.udpPort, externalIP="127.0.0.2", listenUDP=listenUDP,
resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater)
remote_node.start_listening()
remoteContact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.2', 9182, self.node._protocol)
self.node.addContact(remoteContact)
self.error = None
def handleError(f):
self.error = 'An RPC error occurred: %s' % f.getErrorMessage()
def handleResult(result):
expectedResult = 'pong'
if result != expectedResult:
self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' \
% (expectedResult, result)
# Publish the "local" node on the network
self.node.start_listening()
# Simulate the RPC
df = remoteContact.ping()
df.addCallback(handleResult)
df.addErrback(handleError)
for _ in range(10):
self._reactor.advance(1)
self.failIf(self.error, self.error)
# The list of sent RPC messages should be empty at this stage
self.failUnlessEqual(len(self.node._protocol._sentMessages), 0,
'The protocol is still waiting for a RPC result, '
'but the transaction is already done!')
def testRPCAccess(self):
""" Tests invalid RPC requests
Verifies that a RPC request for an existing but unpublished
method is denied, and that the associated (remote) exception gets
raised locally """
remote_node = Node(node_id='2' * 48, udpPort=self.udpPort, externalIP="127.0.0.2", listenUDP=listenUDP,
resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater)
remote_node.start_listening()
remote_contact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.2', 9182, self.node._protocol)
self.node.addContact(remote_contact)
self.error = None
def handleError(f):
try:
f.raiseException()
except AttributeError, e:
# This is the expected outcome since the remote node did not publish the method
self.error = None
except Exception, e:
self.error = 'The remote method failed, but the wrong exception was raised; ' \
'expected AttributeError, got %s' % type(e)
def handleResult(result):
self.error = 'The remote method executed successfully, returning: "%s"; ' \
'this RPC should not have been allowed.' % result
self.node.start_listening()
self._reactor.pump([1 for _ in range(10)])
# Simulate the RPC
df = remote_contact.not_a_rpc_function()
df.addCallback(handleResult)
df.addErrback(handleError)
self._reactor.pump([1 for _ in range(10)])
self.failIf(self.error, self.error)
# The list of sent RPC messages should be empty at this stage
self.failUnlessEqual(len(self.node._protocol._sentMessages), 0,
'The protocol is still waiting for a RPC result, '
'but the transaction is already done!')
def testRPCRequestArgs(self):
""" Tests if an RPC requiring arguments is executed correctly """
remote_node = Node(node_id='2' * 48, udpPort=self.udpPort, externalIP="127.0.0.2", listenUDP=listenUDP,
resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater)
remote_node.start_listening()
remote_contact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.2', 9182, self.node._protocol)
self.node.addContact(remote_contact)
self.error = None
def handleError(f):
self.error = 'An RPC error occurred: %s' % f.getErrorMessage()
def handleResult(result):
expectedResult = 'pong'
if result != expectedResult:
self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' % \
(expectedResult, result)
# Publish the "local" node on the network
self.node.start_listening()
# Simulate the RPC
df = remote_contact.ping()
df.addCallback(handleResult)
df.addErrback(handleError)
self._reactor.pump([1 for _ in range(10)])
self.failIf(self.error, self.error)
# The list of sent RPC messages should be empty at this stage
self.failUnlessEqual(len(self.node._protocol._sentMessages), 0,
'The protocol is still waiting for a RPC result, '
'but the transaction is already done!')
# import time
# import unittest
# import twisted.internet.selectreactor
#
# import lbrynet.dht.protocol
# import lbrynet.dht.contact
# import lbrynet.dht.constants
# import lbrynet.dht.msgtypes
# from lbrynet.dht.error import TimeoutError
# from lbrynet.dht.node import Node, rpcmethod
#
#
# class KademliaProtocolTest(unittest.TestCase):
# """ Test case for the Protocol class """
#
# def setUp(self):
# del lbrynet.dht.protocol.reactor
# lbrynet.dht.protocol.reactor = twisted.internet.selectreactor.SelectReactor()
# self.node = Node(node_id='1' * 48, udpPort=9182, externalIP="127.0.0.1")
# self.protocol = lbrynet.dht.protocol.KademliaProtocol(self.node)
#
# def testReactor(self):
# """ Tests if the reactor can start/stop the protocol correctly """
# lbrynet.dht.protocol.reactor.listenUDP(0, self.protocol)
# lbrynet.dht.protocol.reactor.callLater(0, lbrynet.dht.protocol.reactor.stop)
# lbrynet.dht.protocol.reactor.run()
#
# def testRPCTimeout(self):
# """ Tests if a RPC message sent to a dead remote node times out correctly """
#
# @rpcmethod
# def fake_ping(*args, **kwargs):
# time.sleep(lbrynet.dht.constants.rpcTimeout + 1)
# return 'pong'
#
# real_ping = self.node.ping
# real_timeout = lbrynet.dht.constants.rpcTimeout
# real_attempts = lbrynet.dht.constants.rpcAttempts
# lbrynet.dht.constants.rpcAttempts = 1
# lbrynet.dht.constants.rpcTimeout = 1
# self.node.ping = fake_ping
# deadContact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.1', 9182, self.protocol)
# self.node.addContact(deadContact)
# # Make sure the contact was added
# self.failIf(deadContact not in self.node.contacts,
# 'Contact not added to fake node (error in test code)')
# lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol)
#
# # Run the PING RPC (which should raise a timeout error)
# df = self.protocol.sendRPC(deadContact, 'ping', {})
#
# def check_timeout(err):
# self.assertEqual(type(err), TimeoutError)
#
# df.addErrback(check_timeout)
#
# def reset_values():
# self.node.ping = real_ping
# lbrynet.dht.constants.rpcTimeout = real_timeout
# lbrynet.dht.constants.rpcAttempts = real_attempts
#
# # See if the contact was removed due to the timeout
# def check_removed_contact():
# self.failIf(deadContact in self.node.contacts,
# 'Contact was not removed after RPC timeout; check exception types.')
#
# df.addCallback(lambda _: reset_values())
#
# # Stop the reactor if a result arrives (timeout or not)
# df.addBoth(lambda _: lbrynet.dht.protocol.reactor.stop())
# df.addCallback(lambda _: check_removed_contact())
# lbrynet.dht.protocol.reactor.run()
#
# def testRPCRequest(self):
# """ Tests if a valid RPC request is executed and responded to correctly """
# remoteContact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.1', 9182, self.protocol)
# self.node.addContact(remoteContact)
# self.error = None
#
# def handleError(f):
# self.error = 'An RPC error occurred: %s' % f.getErrorMessage()
#
# def handleResult(result):
# expectedResult = 'pong'
# if result != expectedResult:
# self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' \
# % (expectedResult, result)
#
# # Publish the "local" node on the network
# lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol)
# # Simulate the RPC
# df = remoteContact.ping()
# df.addCallback(handleResult)
# df.addErrback(handleError)
# df.addBoth(lambda _: lbrynet.dht.protocol.reactor.stop())
# lbrynet.dht.protocol.reactor.run()
# self.failIf(self.error, self.error)
# # The list of sent RPC messages should be empty at this stage
# self.failUnlessEqual(len(self.protocol._sentMessages), 0,
# 'The protocol is still waiting for a RPC result, '
# 'but the transaction is already done!')
#
# def testRPCAccess(self):
# """ Tests invalid RPC requests
# Verifies that a RPC request for an existing but unpublished
# method is denied, and that the associated (remote) exception gets
# raised locally """
# remoteContact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.1', 9182, self.protocol)
# self.node.addContact(remoteContact)
# self.error = None
#
# def handleError(f):
# try:
# f.raiseException()
# except AttributeError, e:
# # This is the expected outcome since the remote node did not publish the method
# self.error = None
# except Exception, e:
# self.error = 'The remote method failed, but the wrong exception was raised; ' \
# 'expected AttributeError, got %s' % type(e)
#
# def handleResult(result):
# self.error = 'The remote method executed successfully, returning: "%s"; ' \
# 'this RPC should not have been allowed.' % result
#
# # Publish the "local" node on the network
# lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol)
# # Simulate the RPC
# df = remoteContact.not_a_rpc_function()
# df.addCallback(handleResult)
# df.addErrback(handleError)
# df.addBoth(lambda _: lbrynet.dht.protocol.reactor.stop())
# lbrynet.dht.protocol.reactor.run()
# self.failIf(self.error, self.error)
# # The list of sent RPC messages should be empty at this stage
# self.failUnlessEqual(len(self.protocol._sentMessages), 0,
# 'The protocol is still waiting for a RPC result, '
# 'but the transaction is already done!')
#
# def testRPCRequestArgs(self):
# """ Tests if an RPC requiring arguments is executed correctly """
# remoteContact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.1', 9182, self.protocol)
# self.node.addContact(remoteContact)
# self.error = None
#
# def handleError(f):
# self.error = 'An RPC error occurred: %s' % f.getErrorMessage()
#
# def handleResult(result):
# expectedResult = 'pong'
# if result != expectedResult:
# self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' % \
# (expectedResult, result)
#
# # Publish the "local" node on the network
# lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol)
# # Simulate the RPC
# df = remoteContact.ping()
# df.addCallback(handleResult)
# df.addErrback(handleError)
# df.addBoth(lambda _: lbrynet.dht.protocol.reactor.stop())
# lbrynet.dht.protocol.reactor.run()
# self.failIf(self.error, self.error)
# # The list of sent RPC messages should be empty at this stage
# self.failUnlessEqual(len(self.protocol._sentMessages), 0,
# 'The protocol is still waiting for a RPC result, '
# 'but the transaction is already done!')

View file

@ -1,29 +1,16 @@
import hashlib
import unittest
import lbrynet.dht.constants
import lbrynet.dht.routingtable
import lbrynet.dht.contact
import lbrynet.dht.node
import lbrynet.dht.distance
from twisted.trial import unittest
from twisted.internet import defer
from lbrynet.dht import constants
from lbrynet.dht.routingtable import TreeRoutingTable
from lbrynet.dht.contact import ContactManager
from lbrynet.dht.distance import Distance
class FakeRPCProtocol(object):
""" Fake RPC protocol; allows lbrynet.dht.contact.Contact objects to "send" RPCs """
def sendRPC(self, *args, **kwargs):
return FakeDeferred()
class FakeDeferred(object):
""" Fake Twisted Deferred object; allows the routing table to add callbacks that do nothing """
def addCallback(self, *args, **kwargs):
return
def addErrback(self, *args, **kwargs):
return
def addCallbacks(self, *args, **kwargs):
return
return defer.succeed(None)
class TreeRoutingTableTest(unittest.TestCase):
@ -31,9 +18,10 @@ class TreeRoutingTableTest(unittest.TestCase):
def setUp(self):
h = hashlib.sha384()
h.update('node1')
self.contact_manager = ContactManager()
self.nodeID = h.digest()
self.protocol = FakeRPCProtocol()
self.routingTable = lbrynet.dht.routingtable.TreeRoutingTable(self.nodeID)
self.routingTable = TreeRoutingTable(self.nodeID)
def testDistance(self):
""" Test to see if distance method returns correct result"""
@ -42,86 +30,91 @@ class TreeRoutingTableTest(unittest.TestCase):
basicTestList = [('123456789', '123456789', 0L), ('12345', '98765', 34527773184L)]
for test in basicTestList:
result = lbrynet.dht.distance.Distance(test[0])(test[1])
result = Distance(test[0])(test[1])
self.failIf(result != test[2], 'Result of _distance() should be %s but %s returned' %
(test[2], result))
baseIp = '146.64.19.111'
ipTestList = ['146.64.29.222', '192.68.19.333']
distanceOne = lbrynet.dht.distance.Distance(baseIp)(ipTestList[0])
distanceTwo = lbrynet.dht.distance.Distance(baseIp)(ipTestList[1])
distanceOne = Distance(baseIp)(ipTestList[0])
distanceTwo = Distance(baseIp)(ipTestList[1])
self.failIf(distanceOne > distanceTwo, '%s should be closer to the base ip %s than %s' %
(ipTestList[0], baseIp, ipTestList[1]))
@defer.inlineCallbacks
def testAddContact(self):
""" Tests if a contact can be added and retrieved correctly """
# Create the contact
h = hashlib.sha384()
h.update('node2')
contactID = h.digest()
contact = lbrynet.dht.contact.Contact(contactID, '127.0.0.1', 91824, self.protocol)
contact = self.contact_manager.make_contact(contactID, '127.0.0.1', 91824, self.protocol)
# Now add it...
self.routingTable.addContact(contact)
yield self.routingTable.addContact(contact)
# ...and request the closest nodes to it (will retrieve it)
closestNodes = self.routingTable.findCloseNodes(contactID, lbrynet.dht.constants.k)
closestNodes = self.routingTable.findCloseNodes(contactID, constants.k)
self.failUnlessEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; expected 1,'
' got %d' % len(closestNodes))
self.failUnless(contact in closestNodes, 'Added contact not found by issueing '
'_findCloseNodes()')
@defer.inlineCallbacks
def testGetContact(self):
""" Tests if a specific existing contact can be retrieved correctly """
h = hashlib.sha384()
h.update('node2')
contactID = h.digest()
contact = lbrynet.dht.contact.Contact(contactID, '127.0.0.1', 91824, self.protocol)
contact = self.contact_manager.make_contact(contactID, '127.0.0.1', 91824, self.protocol)
# Now add it...
self.routingTable.addContact(contact)
yield self.routingTable.addContact(contact)
# ...and get it again
sameContact = self.routingTable.getContact(contactID)
self.failUnlessEqual(contact, sameContact, 'getContact() should return the same contact')
@defer.inlineCallbacks
def testAddParentNodeAsContact(self):
"""
Tests the routing table's behaviour when attempting to add its parent node as a contact
"""
# Create a contact with the same ID as the local node's ID
contact = lbrynet.dht.contact.Contact(self.nodeID, '127.0.0.1', 91824, self.protocol)
contact = self.contact_manager.make_contact(self.nodeID, '127.0.0.1', 91824, self.protocol)
# Now try to add it
self.routingTable.addContact(contact)
yield self.routingTable.addContact(contact)
# ...and request the closest nodes to it using FIND_NODE
closestNodes = self.routingTable.findCloseNodes(self.nodeID, lbrynet.dht.constants.k)
closestNodes = self.routingTable.findCloseNodes(self.nodeID, constants.k)
self.failIf(contact in closestNodes, 'Node added itself as a contact')
@defer.inlineCallbacks
def testRemoveContact(self):
""" Tests contact removal """
# Create the contact
h = hashlib.sha384()
h.update('node2')
contactID = h.digest()
contact = lbrynet.dht.contact.Contact(contactID, '127.0.0.1', 91824, self.protocol)
contact = self.contact_manager.make_contact(contactID, '127.0.0.1', 91824, self.protocol)
# Now add it...
self.routingTable.addContact(contact)
yield self.routingTable.addContact(contact)
# Verify addition
self.failUnlessEqual(len(self.routingTable._buckets[0]), 1, 'Contact not added properly')
# Now remove it
self.routingTable.removeContact(contact.id)
self.routingTable.removeContact(contact)
self.failUnlessEqual(len(self.routingTable._buckets[0]), 0, 'Contact not removed properly')
@defer.inlineCallbacks
def testSplitBucket(self):
""" Tests if the the routing table correctly dynamically splits k-buckets """
self.failUnlessEqual(self.routingTable._buckets[0].rangeMax, 2**384,
'Initial k-bucket range should be 0 <= range < 2**384')
# Add k contacts
for i in range(lbrynet.dht.constants.k):
for i in range(constants.k):
h = hashlib.sha384()
h.update('remote node %d' % i)
nodeID = h.digest()
contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol)
self.routingTable.addContact(contact)
contact = self.contact_manager.make_contact(nodeID, '127.0.0.1', 91824, self.protocol)
yield self.routingTable.addContact(contact)
self.failUnlessEqual(len(self.routingTable._buckets), 1,
'Only k nodes have been added; the first k-bucket should now '
'be full, but should not yet be split')
@ -129,8 +122,8 @@ class TreeRoutingTableTest(unittest.TestCase):
h = hashlib.sha384()
h.update('yet another remote node')
nodeID = h.digest()
contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol)
self.routingTable.addContact(contact)
contact = self.contact_manager.make_contact(nodeID, '127.0.0.1', 91824, self.protocol)
yield self.routingTable.addContact(contact)
self.failUnlessEqual(len(self.routingTable._buckets), 2,
'k+1 nodes have been added; the first k-bucket should have been '
'split into two new buckets')
@ -144,99 +137,113 @@ class TreeRoutingTableTest(unittest.TestCase):
'K-bucket was split, but the min/max ranges were '
'not divided properly')
def testFullBucketNoSplit(self):
@defer.inlineCallbacks
def testFullSplit(self):
"""
Test that a bucket is not split if it full, but does not cover the range
containing the parent node's ID
"""
self.routingTable._parentNodeID = 49 * 'a'
# more than 384 bits; this will not be in the range of _any_ k-bucket
node_ids = [
"d4a27096d81e3c4efacce9f940e887c956f736f859c8037b556efec6fdda5c388ae92bae96b9eb204b24da2f376c4282",
"553c0bfe119c35247c8cb8124091acb5c05394d5be7b019f6b1a5e18036af7a6148711ad6d47a0f955047bf9eac868aa",
"671a179c251c90863f46e7ef54264cbbad743fe3127871064d8f051ce4124fcbd893339e11358f621655e37bd6a74097",
"f896bafeb7ffb14b92986e3b08ee06807fdd5be34ab43f4f52559a5bbf0f12dedcd8556801f97c334b3ac9be7a0f7a93",
"33a7deb380eb4707211184798b66840c22c396e8cde00b75b64f9ead09bad1141b56d35a93bd511adb28c6708eecc39d",
"5e1e8ca575b536ae5ec52f7766ada904a64ebaad805909b1067ec3c984bf99909c9fcdd37e04ea5c5c043ea8830100ce",
"ee18857d0c1f7fc413424f3ffead4871f2499646d4c2ac16f35f0c8864318ca21596915f18f85a3a25f8ceaa56c844aa",
"68039f78fbf130873e7cce2f71f39d217dcb7f3fe562d64a85de4e21ee980b4a800f51bf6851d2bbf10e6590fe0d46b2"
]
# Add k contacts
for i in range(lbrynet.dht.constants.k):
for i in range(constants.k):
h = hashlib.sha384()
h.update('remote node %d' % i)
nodeID = h.digest()
contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol)
self.routingTable.addContact(contact)
self.failUnlessEqual(len(self.routingTable._buckets), 1, 'Only k nodes have been added; '
'the first k-bucket should now be '
'full, and there should not be '
'more than 1 bucket')
self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), lbrynet.dht.constants.k,
'Bucket should have k contacts; expected %d got %d' %
(lbrynet.dht.constants.k,
len(self.routingTable._buckets[0]._contacts)))
# Now add 1 more contact
self.assertEquals(nodeID, node_ids[i].decode('hex'))
contact = self.contact_manager.make_contact(nodeID, '127.0.0.1', 91824, self.protocol)
yield self.routingTable.addContact(contact)
self.failUnlessEqual(len(self.routingTable._buckets), 1)
self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), constants.k)
# try adding a contact who is further from us than the k'th known contact
h = hashlib.sha384()
h.update('yet another remote node!')
nodeID = h.digest()
contact = self.contact_manager.make_contact(nodeID, '127.0.0.1', 91824, self.protocol)
yield self.routingTable.addContact(contact)
self.failUnlessEqual(len(self.routingTable._buckets), 1)
self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), constants.k)
self.failIf(contact in self.routingTable._buckets[0]._contacts)
# try adding a contact who is closer to us than the k'th known contact
h = hashlib.sha384()
h.update('yet another remote node')
nodeID = h.digest()
contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol)
self.routingTable.addContact(contact)
self.failUnlessEqual(len(self.routingTable._buckets), 1,
'There should not be more than 1 bucket, since the bucket '
'should not have been split (parent node ID not in range)')
self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts),
lbrynet.dht.constants.k, 'Bucket should have k contacts; '
'expected %d got %d' %
(lbrynet.dht.constants.k,
len(self.routingTable._buckets[0]._contacts)))
self.failIf(contact in self.routingTable._buckets[0]._contacts,
'New contact should have been discarded (since RPC is faked in this test)')
contact = self.contact_manager.make_contact(nodeID, '127.0.0.1', 91824, self.protocol)
yield self.routingTable.addContact(contact)
self.failUnlessEqual(len(self.routingTable._buckets), 2)
self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), 5)
self.failUnlessEqual(len(self.routingTable._buckets[1]._contacts), 4)
self.failIf(contact not in self.routingTable._buckets[1]._contacts)
class KeyErrorFixedTest(unittest.TestCase):
""" Basic tests case for boolean operators on the Contact class """
def setUp(self):
own_id = (2 ** lbrynet.dht.constants.key_bits) - 1
# carefully chosen own_id. here's the logic
# we want a bunch of buckets (k+1, to be exact), and we want to make sure own_id
# is not in bucket 0. so we put own_id at the end so we can keep splitting by adding to the
# end
self.table = lbrynet.dht.routingtable.OptimizedTreeRoutingTable(own_id)
def fill_bucket(self, bucket_min):
bucket_size = lbrynet.dht.constants.k
for i in range(bucket_min, bucket_min + bucket_size):
self.table.addContact(lbrynet.dht.contact.Contact(long(i), '127.0.0.1', 9999, None))
def overflow_bucket(self, bucket_min):
bucket_size = lbrynet.dht.constants.k
self.fill_bucket(bucket_min)
self.table.addContact(
lbrynet.dht.contact.Contact(long(bucket_min + bucket_size + 1),
'127.0.0.1', 9999, None))
def testKeyError(self):
# find middle, so we know where bucket will split
bucket_middle = self.table._buckets[0].rangeMax / 2
# fill last bucket
self.fill_bucket(self.table._buckets[0].rangeMax - lbrynet.dht.constants.k - 1)
# -1 in previous line because own_id is in last bucket
# fill/overflow 7 more buckets
bucket_start = 0
for i in range(0, lbrynet.dht.constants.k):
self.overflow_bucket(bucket_start)
bucket_start += bucket_middle / (2 ** i)
# replacement cache now has k-1 entries.
# adding one more contact to bucket 0 used to cause a KeyError, but it should work
self.table.addContact(
lbrynet.dht.contact.Contact(long(lbrynet.dht.constants.k + 2), '127.0.0.1', 9999, None))
# import math
# print ""
# for i, bucket in enumerate(self.table._buckets):
# print "Bucket " + str(i) + " (2 ** " + str(
# math.log(bucket.rangeMin, 2) if bucket.rangeMin > 0 else 0) + " <= x < 2 ** "+str(
# math.log(bucket.rangeMax, 2)) + ")"
# for c in bucket.getContacts():
# print " contact " + str(c.id)
# for key, bucket in self.table._replacementCache.iteritems():
# print "Replacement Cache for Bucket " + str(key)
# for c in bucket:
# print " contact " + str(c.id)
# class KeyErrorFixedTest(unittest.TestCase):
# """ Basic tests case for boolean operators on the Contact class """
#
# def setUp(self):
# own_id = (2 ** constants.key_bits) - 1
# # carefully chosen own_id. here's the logic
# # we want a bunch of buckets (k+1, to be exact), and we want to make sure own_id
# # is not in bucket 0. so we put own_id at the end so we can keep splitting by adding to the
# # end
#
# self.table = lbrynet.dht.routingtable.OptimizedTreeRoutingTable(own_id)
#
# def fill_bucket(self, bucket_min):
# bucket_size = lbrynet.dht.constants.k
# for i in range(bucket_min, bucket_min + bucket_size):
# self.table.addContact(lbrynet.dht.contact.Contact(long(i), '127.0.0.1', 9999, None))
#
# def overflow_bucket(self, bucket_min):
# bucket_size = lbrynet.dht.constants.k
# self.fill_bucket(bucket_min)
# self.table.addContact(
# lbrynet.dht.contact.Contact(long(bucket_min + bucket_size + 1),
# '127.0.0.1', 9999, None))
#
# def testKeyError(self):
#
# # find middle, so we know where bucket will split
# bucket_middle = self.table._buckets[0].rangeMax / 2
#
# # fill last bucket
# self.fill_bucket(self.table._buckets[0].rangeMax - lbrynet.dht.constants.k - 1)
# # -1 in previous line because own_id is in last bucket
#
# # fill/overflow 7 more buckets
# bucket_start = 0
# for i in range(0, lbrynet.dht.constants.k):
# self.overflow_bucket(bucket_start)
# bucket_start += bucket_middle / (2 ** i)
#
# # replacement cache now has k-1 entries.
# # adding one more contact to bucket 0 used to cause a KeyError, but it should work
# self.table.addContact(
# lbrynet.dht.contact.Contact(long(lbrynet.dht.constants.k + 2), '127.0.0.1', 9999, None))
#
# # import math
# # print ""
# # for i, bucket in enumerate(self.table._buckets):
# # print "Bucket " + str(i) + " (2 ** " + str(
# # math.log(bucket.rangeMin, 2) if bucket.rangeMin > 0 else 0) + " <= x < 2 ** "+str(
# # math.log(bucket.rangeMax, 2)) + ")"
# # for c in bucket.getContacts():
# # print " contact " + str(c.id)
# # for key, bucket in self.table._replacementCache.iteritems():
# # print "Replacement Cache for Bucket " + str(key)
# # for c in bucket:
# # print " contact " + str(c.id)

View file

@ -5,20 +5,11 @@ import os
import tempfile
import shutil
import mock
import logging
from lbrynet.dht.encoding import Bencode
from lbrynet.dht.error import DecodeError
from lbrynet.dht.msgformat import DefaultFormat
from lbrynet.dht.msgtypes import ResponseMessage, RequestMessage, ErrorMessage
_encode = Bencode()
_datagram_formatter = DefaultFormat()
DEFAULT_TIMESTAMP = datetime.datetime(2016, 1, 1)
DEFAULT_ISO_TIME = time.mktime(DEFAULT_TIMESTAMP.timetuple())
log = logging.getLogger("lbrynet.tests.util")
def mk_db_and_blob_dir():
db_dir = tempfile.mkdtemp()
@ -49,28 +40,5 @@ def resetTime(test_case, timestamp=DEFAULT_TIMESTAMP):
patcher.start().return_value = timestamp
test_case.addCleanup(patcher.stop)
def is_android():
return 'ANDROID_ARGUMENT' in os.environ # detect Android using the Kivy way
def debug_kademlia_packet(data, source, destination, node):
if log.level != logging.DEBUG:
return
try:
packet = _datagram_formatter.fromPrimitive(_encode.decode(data))
if isinstance(packet, RequestMessage):
log.debug("request %s --> %s %s (node time %s)", source[0], destination[0], packet.request,
node.clock.seconds())
elif isinstance(packet, ResponseMessage):
if isinstance(packet.response, (str, unicode)):
log.debug("response %s <-- %s %s (node time %s)", destination[0], source[0], packet.response,
node.clock.seconds())
else:
log.debug("response %s <-- %s %i contacts (node time %s)", destination[0], source[0],
len(packet.response), node.clock.seconds())
elif isinstance(packet, ErrorMessage):
log.error("error %s <-- %s %s (node time %s)", destination[0], source[0], packet.exceptionType,
node.clock.seconds())
except DecodeError:
log.exception("decode error %s --> %s (node time %s)", source[0], destination[0], node.clock.seconds())