forked from LBRYCommunity/lbry-sdk
125 lines
5.6 KiB
Python
125 lines
5.6 KiB
Python
#!/usr/bin/env python
|
|
#
|
|
# This library is free software, distributed under the terms of
|
|
# the GNU Lesser General Public License Version 3, or any later version.
|
|
# See the COPYING file included in this archive
|
|
|
|
from twisted.trial import unittest
|
|
import struct
|
|
from lbrynet.core.utils import generate_id
|
|
from lbrynet.dht import kbucket
|
|
from lbrynet.dht.contact import ContactManager
|
|
from lbrynet.dht import constants
|
|
|
|
|
|
def address_generator(address=(10, 42, 42, 1)):
|
|
def increment(addr):
|
|
value = struct.unpack("I", "".join([chr(x) for x in list(addr)[::-1]]).encode())[0] + 1
|
|
new_addr = []
|
|
for i in range(4):
|
|
new_addr.append(value % 256)
|
|
value >>= 8
|
|
return tuple(new_addr[::-1])
|
|
|
|
while True:
|
|
yield "{}.{}.{}.{}".format(*address)
|
|
address = increment(address)
|
|
|
|
|
|
class KBucketTest(unittest.TestCase):
|
|
""" Test case for the KBucket class """
|
|
def setUp(self):
|
|
self.address_generator = address_generator()
|
|
self.contact_manager = ContactManager()
|
|
self.kbucket = kbucket.KBucket(0, 2**constants.key_bits, generate_id())
|
|
|
|
def testAddContact(self):
|
|
""" Tests if the bucket handles contact additions/updates correctly """
|
|
# Test if contacts can be added to empty list
|
|
# Add k contacts to bucket
|
|
for i in range(constants.k):
|
|
tmpContact = self.contact_manager.make_contact(generate_id(), next(self.address_generator), 4444, 0, None)
|
|
self.kbucket.addContact(tmpContact)
|
|
self.assertEqual(
|
|
self.kbucket._contacts[i],
|
|
tmpContact,
|
|
"Contact in position %d not the same as the newly-added contact" % i)
|
|
|
|
# Test if contact is not added to full list
|
|
tmpContact = self.contact_manager.make_contact(generate_id(), next(self.address_generator), 4444, 0, None)
|
|
self.assertRaises(kbucket.BucketFull, self.kbucket.addContact, tmpContact)
|
|
|
|
# Test if an existing contact is updated correctly if added again
|
|
existingContact = self.kbucket._contacts[0]
|
|
self.kbucket.addContact(existingContact)
|
|
self.assertEqual(
|
|
self.kbucket._contacts.index(existingContact),
|
|
len(self.kbucket._contacts)-1,
|
|
'Contact not correctly updated; it should be at the end of the list of contacts')
|
|
|
|
def testGetContacts(self):
|
|
# try and get 2 contacts from empty list
|
|
result = self.kbucket.getContacts(2)
|
|
self.assertFalse(len(result) != 0, "Returned list should be empty; returned list length: %d" %
|
|
(len(result)))
|
|
|
|
|
|
# Add k-2 contacts
|
|
node_ids = []
|
|
if constants.k >= 2:
|
|
for i in range(constants.k-2):
|
|
node_ids.append(generate_id())
|
|
tmpContact = self.contact_manager.make_contact(node_ids[-1], next(self.address_generator), 4444, 0,
|
|
None)
|
|
self.kbucket.addContact(tmpContact)
|
|
else:
|
|
# add k contacts
|
|
for i in range(constants.k):
|
|
node_ids.append(generate_id())
|
|
tmpContact = self.contact_manager.make_contact(node_ids[-1], next(self.address_generator), 4444, 0,
|
|
None)
|
|
self.kbucket.addContact(tmpContact)
|
|
|
|
# try to get too many contacts
|
|
# requested count greater than bucket size; should return at most k contacts
|
|
contacts = self.kbucket.getContacts(constants.k+3)
|
|
self.assertTrue(len(contacts) <= constants.k,
|
|
'Returned list should not have more than k entries!')
|
|
|
|
# verify returned contacts in list
|
|
for node_id, i in zip(node_ids, range(constants.k-2)):
|
|
self.assertFalse(self.kbucket._contacts[i].id != node_id,
|
|
"Contact in position %s not same as added contact" % (str(i)))
|
|
|
|
# try to get too many contacts
|
|
# requested count one greater than number of contacts
|
|
if constants.k >= 2:
|
|
result = self.kbucket.getContacts(constants.k-1)
|
|
self.assertFalse(len(result) != constants.k-2,
|
|
"Too many contacts in returned list %s - should be %s" %
|
|
(len(result), constants.k-2))
|
|
else:
|
|
result = self.kbucket.getContacts(constants.k-1)
|
|
# if the count is <= 0, it should return all of it's contats
|
|
self.assertFalse(len(result) != constants.k,
|
|
"Too many contacts in returned list %s - should be %s" %
|
|
(len(result), constants.k-2))
|
|
result = self.kbucket.getContacts(constants.k-3)
|
|
self.assertFalse(len(result) != constants.k-3,
|
|
"Too many contacts in returned list %s - should be %s" %
|
|
(len(result), constants.k-3))
|
|
|
|
def testRemoveContact(self):
|
|
# try remove contact from empty list
|
|
rmContact = self.contact_manager.make_contact(generate_id(), next(self.address_generator), 4444, 0, None)
|
|
self.assertRaises(ValueError, self.kbucket.removeContact, rmContact)
|
|
|
|
# Add couple contacts
|
|
for i in range(constants.k-2):
|
|
tmpContact = self.contact_manager.make_contact(generate_id(), next(self.address_generator), 4444, 0, None)
|
|
self.kbucket.addContact(tmpContact)
|
|
|
|
# try remove contact from empty list
|
|
self.kbucket.addContact(rmContact)
|
|
result = self.kbucket.removeContact(rmContact)
|
|
self.assertFalse(rmContact in self.kbucket._contacts, "Could not remove contact from bucket")
|