2018-05-23 18:28:22 -04:00
import logging
2018-07-17 23:42:11 -03:00
2018-05-23 18:28:22 -04:00
from twisted.trial import unittest
from twisted.internet import defer, task
2018-05-29 16:16:19 -04:00
from lbrynet.dht import constants
2018-05-23 18:28:22 -04:00
from lbrynet.dht.node import Node
2018-07-17 23:42:11 -03:00
from .mock_transport import resolve, listenUDP, MOCK_DHT_SEED_DNS, mock_node_generator
2018-05-23 18:28:22 -04:00
log = logging.getLogger(__name__)
class TestKademliaBase(unittest.TestCase):
timeout = 300.0 # timeout for each test
network_size = 16 # including seed nodes
node_ids = None
seed_dns = MOCK_DHT_SEED_DNS
def _add_next_node(self):
2018-07-17 23:42:11 -03:00
node_id, node_ip = next(self.mock_node_generator)
node = Node(node_id=node_id, udpPort=4444, peerPort=3333, externalIP=node_ip,
2018-05-23 18:28:22 -04:00
resolve=resolve, listenUDP=listenUDP, callLater=self.clock.callLater, clock=self.clock)
return node
2018-07-26 01:20:34 -03:00
def add_node(self, known_addresses):
2018-05-23 18:28:22 -04:00
node = self._add_next_node()
2018-07-26 01:20:34 -03:00
yield node.start(known_addresses)
2018-05-23 18:28:22 -04:00
def get_node(self, node_id):
for node in self.nodes:
if node.node_id == node_id:
return node
raise KeyError(node_id)
def pop_node(self):
node = self.nodes.pop()
yield node.stop()
2018-07-26 01:20:34 -03:00
def pump_clock(self, n, step=None, tick_callback=None):
2018-05-23 18:28:22 -04:00
:param n: seconds to run the reactor for
:param step: reactor tick rate (in seconds)
2018-07-26 01:20:34 -03:00
advanced = 0.0
while advanced < n:
if step:
next_step = step
elif self.clock.getDelayedCalls():
next_call = self.clock.getDelayedCalls()[0].getTime()
next_step = min(n - advanced, max(next_call - self.clock.rightNow, .000000000001))
next_step = n - advanced
assert next_step > 0
advanced += float(next_step)
2018-05-29 16:16:19 -04:00
if tick_callback and callable(tick_callback):
2018-05-23 18:28:22 -04:00
def run_reactor(self, seconds, deferreds, tick_callback=None):
d = defer.DeferredList(deferreds)
self.pump_clock(seconds, tick_callback=tick_callback)
return d
def get_contacts(self):
contacts = {}
for seed in self._seeds:
contacts[seed] = seed.contacts
for node in self._seeds:
contacts[node] = node.contacts
return contacts
def get_routable_addresses(self):
known = set()
for n in self._seeds:
known.update([(c.id, c.address, c.port) for c in n.contacts])
for n in self.nodes:
known.update([(c.id, c.address, c.port) for c in n.contacts])
addresses = {triple[1] for triple in known}
return addresses
def get_online_addresses(self):
online = set()
for n in self._seeds:
for n in self.nodes:
return online
def show_info(self):
known = set()
for n in self._seeds:
known.update([(c.id, c.address, c.port) for c in n.contacts])
for n in self.nodes:
known.update([(c.id, c.address, c.port) for c in n.contacts])
log.info("Routable: %i/%i", len(known), len(self.nodes) + len(self._seeds))
for n in self._seeds:
log.info("seed %s has %i contacts in %i buckets", n.externalIP, len(n.contacts),
len([b for b in n._routingTable._buckets if b.getContacts()]))
for n in self.nodes:
log.info("node %s has %i contacts in %i buckets", n.externalIP, len(n.contacts),
len([b for b in n._routingTable._buckets if b.getContacts()]))
def setUp(self):
2018-07-27 12:18:14 -03:00
import random
2018-05-23 18:28:22 -04:00
self.nodes = []
self._seeds = []
self.clock = task.Clock()
self.mock_node_generator = mock_node_generator(mock_node_ids=self.node_ids)
seed_dl = []
seeds = sorted(list(self.seed_dns.keys()))
known_addresses = [(seed_name, 4444) for seed_name in seeds]
for seed_dns in seeds:
seed = self.nodes.pop()
2018-05-29 16:16:19 -04:00
yield self.run_reactor(constants.checkRefreshInterval+1, seed_dl)
2018-05-23 18:28:22 -04:00
while len(self.nodes + self._seeds) < self.network_size:
network_dl = []
2018-07-26 01:20:34 -03:00
# fixme: We are starting one by one to reduce flakiness on time advance.
# fixme: When that improves, get back to 10+!
for i in range(min(1, self.network_size - len(self._seeds) - len(self.nodes))):
2018-05-29 16:16:19 -04:00
yield self.run_reactor(constants.checkRefreshInterval*2+1, network_dl)
2018-05-23 18:28:22 -04:00
self.assertEqual(len(self.nodes + self._seeds), self.network_size)
2018-05-29 16:16:19 -04:00
2018-05-23 18:28:22 -04:00
def tearDown(self):
dl = []
while self.nodes:
dl.append(self.pop_node()) # stop all of the nodes
while self._seeds:
dl.append(self._seeds.pop().stop()) # and the seeds
yield defer.DeferredList(dl)
def verify_all_nodes_are_routable(self):
routable = set()
node_addresses = {node.externalIP for node in self.nodes}
node_addresses = node_addresses.union({node.externalIP for node in self._seeds})
for node in self._seeds:
contact_addresses = {contact.address for contact in node.contacts}
for node in self.nodes:
contact_addresses = {contact.address for contact in node.contacts}
self.assertSetEqual(routable, node_addresses)
def verify_all_nodes_are_pingable(self):
ping_replies = {}
ping_dl = []
contacted = set()
def _ping_cb(result, node, replies):
replies[node] = result
for node in self._seeds:
contact_addresses = set()
for contact in node.contacts:
d = contact.ping()
d.addCallback(_ping_cb, contact.address, ping_replies)
for node in self.nodes:
contact_addresses = set()
for contact in node.contacts:
d = contact.ping()
d.addCallback(_ping_cb, contact.address, ping_replies)
yield self.run_reactor(2, ping_dl)
node_addresses = {node.externalIP for node in self.nodes}.union({seed.externalIP for seed in self._seeds})
self.assertSetEqual(node_addresses, contacted)
2018-07-24 18:51:41 -03:00
expected = {node: b"pong" for node in contacted}
2018-05-23 18:28:22 -04:00
self.assertDictEqual(ping_replies, expected)