lbry-sdk/tests/unit/dht/test_kbucket.py

126 lines
5.6 KiB
Python
Raw Normal View History

2015-08-20 17:27:15 +02:00
#!/usr/bin/env python
#
# This library is free software, distributed under the terms of
# the GNU Lesser General Public License Version 3, or any later version.
# See the COPYING file included in this archive
from twisted.trial import unittest
import struct
from lbrynet.core.utils import generate_id
2015-08-20 17:27:15 +02:00
from lbrynet.dht import kbucket
from lbrynet.dht.contact import ContactManager
2015-08-20 17:27:15 +02:00
from lbrynet.dht import constants
def address_generator(address=(10, 42, 42, 1)):
def increment(addr):
2018-07-18 02:45:51 +02:00
value = struct.unpack("I", "".join([chr(x) for x in list(addr)[::-1]]).encode())[0] + 1
new_addr = []
for i in range(4):
new_addr.append(value % 256)
value >>= 8
return tuple(new_addr[::-1])
while True:
yield "{}.{}.{}.{}".format(*address)
address = increment(address)
2015-08-20 17:27:15 +02:00
class KBucketTest(unittest.TestCase):
""" Test case for the KBucket class """
def setUp(self):
self.address_generator = address_generator()
self.contact_manager = ContactManager()
self.kbucket = kbucket.KBucket(0, 2**constants.key_bits, generate_id())
2015-08-20 17:27:15 +02:00
def testAddContact(self):
""" Tests if the bucket handles contact additions/updates correctly """
# Test if contacts can be added to empty list
# Add k contacts to bucket
for i in range(constants.k):
tmpContact = self.contact_manager.make_contact(generate_id(), next(self.address_generator), 4444, 0, None)
2015-08-20 17:27:15 +02:00
self.kbucket.addContact(tmpContact)
2018-07-21 22:55:43 +02:00
self.assertEqual(
2017-09-29 12:44:22 +02:00
self.kbucket._contacts[i],
tmpContact,
"Contact in position %d not the same as the newly-added contact" % i)
2015-08-20 17:27:15 +02:00
# Test if contact is not added to full list
tmpContact = self.contact_manager.make_contact(generate_id(), next(self.address_generator), 4444, 0, None)
2018-07-21 22:55:43 +02:00
self.assertRaises(kbucket.BucketFull, self.kbucket.addContact, tmpContact)
2017-09-29 12:44:22 +02:00
2015-08-20 17:27:15 +02:00
# Test if an existing contact is updated correctly if added again
existingContact = self.kbucket._contacts[0]
self.kbucket.addContact(existingContact)
2018-07-21 22:55:43 +02:00
self.assertEqual(
2017-09-29 12:44:22 +02:00
self.kbucket._contacts.index(existingContact),
len(self.kbucket._contacts)-1,
'Contact not correctly updated; it should be at the end of the list of contacts')
2015-08-20 17:27:15 +02:00
def testGetContacts(self):
# try and get 2 contacts from empty list
result = self.kbucket.getContacts(2)
2018-07-21 22:55:43 +02:00
self.assertFalse(len(result) != 0, "Returned list should be empty; returned list length: %d" %
2017-09-29 12:44:22 +02:00
(len(result)))
2015-08-20 17:27:15 +02:00
# Add k-2 contacts
node_ids = []
2015-08-20 17:27:15 +02:00
if constants.k >= 2:
for i in range(constants.k-2):
node_ids.append(generate_id())
2018-05-24 01:33:16 +02:00
tmpContact = self.contact_manager.make_contact(node_ids[-1], next(self.address_generator), 4444, 0,
None)
2015-08-20 17:27:15 +02:00
self.kbucket.addContact(tmpContact)
else:
# add k contacts
for i in range(constants.k):
node_ids.append(generate_id())
2018-05-24 01:33:16 +02:00
tmpContact = self.contact_manager.make_contact(node_ids[-1], next(self.address_generator), 4444, 0,
None)
2015-08-20 17:27:15 +02:00
self.kbucket.addContact(tmpContact)
# try to get too many contacts
# requested count greater than bucket size; should return at most k contacts
contacts = self.kbucket.getContacts(constants.k+3)
2018-07-21 22:55:43 +02:00
self.assertTrue(len(contacts) <= constants.k,
2017-09-29 12:44:22 +02:00
'Returned list should not have more than k entries!')
2015-08-20 17:27:15 +02:00
# verify returned contacts in list
for node_id, i in zip(node_ids, range(constants.k-2)):
2018-07-21 22:55:43 +02:00
self.assertFalse(self.kbucket._contacts[i].id != node_id,
2017-09-29 12:44:22 +02:00
"Contact in position %s not same as added contact" % (str(i)))
2015-08-20 17:27:15 +02:00
# try to get too many contacts
# requested count one greater than number of contacts
if constants.k >= 2:
result = self.kbucket.getContacts(constants.k-1)
2018-07-21 22:55:43 +02:00
self.assertFalse(len(result) != constants.k-2,
2017-09-29 12:44:22 +02:00
"Too many contacts in returned list %s - should be %s" %
(len(result), constants.k-2))
2015-08-20 17:27:15 +02:00
else:
result = self.kbucket.getContacts(constants.k-1)
# if the count is <= 0, it should return all of it's contats
2018-07-21 22:55:43 +02:00
self.assertFalse(len(result) != constants.k,
2017-09-29 12:44:22 +02:00
"Too many contacts in returned list %s - should be %s" %
(len(result), constants.k-2))
2015-08-20 17:27:15 +02:00
result = self.kbucket.getContacts(constants.k-3)
2018-07-21 22:55:43 +02:00
self.assertFalse(len(result) != constants.k-3,
2017-09-29 12:44:22 +02:00
"Too many contacts in returned list %s - should be %s" %
(len(result), constants.k-3))
2015-08-20 17:27:15 +02:00
def testRemoveContact(self):
# try remove contact from empty list
rmContact = self.contact_manager.make_contact(generate_id(), next(self.address_generator), 4444, 0, None)
2018-07-21 22:55:43 +02:00
self.assertRaises(ValueError, self.kbucket.removeContact, rmContact)
2017-09-29 12:44:22 +02:00
2015-08-20 17:27:15 +02:00
# Add couple contacts
for i in range(constants.k-2):
tmpContact = self.contact_manager.make_contact(generate_id(), next(self.address_generator), 4444, 0, None)
2015-08-20 17:27:15 +02:00
self.kbucket.addContact(tmpContact)
# try remove contact from empty list
self.kbucket.addContact(rmContact)
result = self.kbucket.removeContact(rmContact)
2018-07-21 22:55:43 +02:00
self.assertFalse(rmContact in self.kbucket._contacts, "Could not remove contact from bucket")