assertEquals() -> assertEqual()

This commit is contained in:
Lex Berezhny 2018-07-21 16:55:43 -04:00 committed by Jack Robison
parent 24a872885a
commit fbdbcc8070
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
12 changed files with 117 additions and 117 deletions

View file

@ -35,6 +35,6 @@ class TestPeerExpiration(TestKademliaBase):
# run the network long enough for two failures to happen
self.pump_clock(constants.checkRefreshInterval * 3)
self.assertEquals(len(get_nodes_with_stale_contacts()), 0)
self.assertEqual(len(get_nodes_with_stale_contacts()), 0)
self.verify_all_nodes_are_routable()
self.verify_all_nodes_are_pingable()

View file

@ -65,7 +65,7 @@ class KademliaProtocolTest(unittest.TestCase):
self.node.ping = fake_ping
# Make sure the contact was added
self.failIf(self.remote_contact not in self.node.contacts,
self.assertFalse(self.remote_contact not in self.node.contacts,
'Contact not added to fake node (error in test code)')
self.node.start_listening()
@ -84,7 +84,7 @@ class KademliaProtocolTest(unittest.TestCase):
# See if the contact was removed due to the timeout
def check_removed_contact():
self.failIf(self.remote_contact in self.node.contacts,
self.assertFalse(self.remote_contact in self.node.contacts,
'Contact was not removed after RPC timeout; check exception types.')
df.addCallback(lambda _: reset_values())
@ -118,9 +118,9 @@ class KademliaProtocolTest(unittest.TestCase):
self._reactor.advance(2)
yield df
self.failIf(self.error, self.error)
self.assertFalse(self.error, self.error)
# The list of sent RPC messages should be empty at this stage
self.failUnlessEqual(len(self.node._protocol._sentMessages), 0,
self.assertEqual(len(self.node._protocol._sentMessages), 0,
'The protocol is still waiting for a RPC result, '
'but the transaction is already done!')
@ -154,9 +154,9 @@ class KademliaProtocolTest(unittest.TestCase):
df.addCallback(handleResult)
df.addErrback(handleError)
self._reactor.pump([1 for _ in range(10)])
self.failIf(self.error, self.error)
self.assertFalse(self.error, self.error)
# The list of sent RPC messages should be empty at this stage
self.failUnlessEqual(len(self.node._protocol._sentMessages), 0,
self.assertEqual(len(self.node._protocol._sentMessages), 0,
'The protocol is still waiting for a RPC result, '
'but the transaction is already done!')
@ -175,21 +175,21 @@ class KademliaProtocolTest(unittest.TestCase):
d = self.remote_contact.findValue(fake_blob)
self._reactor.advance(3)
find_value_response = yield d
self.assertEquals(self.remote_contact.protocolVersion, 0)
self.assertEqual(self.remote_contact.protocolVersion, 0)
self.assertTrue('protocolVersion' not in find_value_response)
self.remote_node.findValue = original_findvalue
d = self.remote_contact.findValue(fake_blob)
self._reactor.advance(3)
find_value_response = yield d
self.assertEquals(self.remote_contact.protocolVersion, 1)
self.assertEqual(self.remote_contact.protocolVersion, 1)
self.assertTrue('protocolVersion' not in find_value_response)
self.remote_node.findValue = findValue
d = self.remote_contact.findValue(fake_blob)
self._reactor.advance(3)
find_value_response = yield d
self.assertEquals(self.remote_contact.protocolVersion, 0)
self.assertEqual(self.remote_contact.protocolVersion, 0)
self.assertTrue('protocolVersion' not in find_value_response)
@defer.inlineCallbacks
@ -227,16 +227,16 @@ class KademliaProtocolTest(unittest.TestCase):
d = self.remote_contact.findValue(fake_blob)
self._reactor.advance(3)
find_value_response = yield d
self.assertEquals(self.remote_contact.protocolVersion, 0)
self.assertEqual(self.remote_contact.protocolVersion, 0)
self.assertTrue('protocolVersion' not in find_value_response)
token = find_value_response['token']
d = self.remote_contact.store(fake_blob, token, 3333, self.node.node_id, 0)
self._reactor.advance(3)
response = yield d
self.assertEquals(response, "OK")
self.assertEquals(self.remote_contact.protocolVersion, 0)
self.assertEqual(response, "OK")
self.assertEqual(self.remote_contact.protocolVersion, 0)
self.assertTrue(self.remote_node._dataStore.hasPeersForBlob(fake_blob))
self.assertEquals(len(self.remote_node._dataStore.getStoringContacts()), 1)
self.assertEqual(len(self.remote_node._dataStore.getStoringContacts()), 1)
@defer.inlineCallbacks
def testStoreFromPre_0_20_0_Node(self):
@ -253,7 +253,7 @@ class KademliaProtocolTest(unittest.TestCase):
d = us_from_them.findValue(fake_blob)
self._reactor.advance(3)
find_value_response = yield d
self.assertEquals(self.remote_contact.protocolVersion, 0)
self.assertEqual(self.remote_contact.protocolVersion, 0)
self.assertTrue('protocolVersion' not in find_value_response)
token = find_value_response['token']
us_from_them.update_protocol_version(0)
@ -262,8 +262,8 @@ class KademliaProtocolTest(unittest.TestCase):
)
self._reactor.advance(3)
response = yield d
self.assertEquals(response, "OK")
self.assertEquals(self.remote_contact.protocolVersion, 0)
self.assertEqual(response, "OK")
self.assertEqual(self.remote_contact.protocolVersion, 0)
self.assertTrue(self.node._dataStore.hasPeersForBlob(fake_blob))
self.assertEquals(len(self.node._dataStore.getStoringContacts()), 1)
self.assertEqual(len(self.node._dataStore.getStoringContacts()), 1)
self.assertIs(self.node._dataStore.getStoringContacts()[0], self.remote_contact)

View file

@ -23,16 +23,16 @@ class TestStoreExpiration(TestKademliaBase):
# verify the nodes we think stored it did actually store it
storing_nodes = [node for node in all_nodes if node.node_id.encode('hex') in storing_node_ids]
self.assertEquals(len(storing_nodes), len(storing_node_ids))
self.assertEquals(len(storing_nodes), constants.k)
self.assertEqual(len(storing_nodes), len(storing_node_ids))
self.assertEqual(len(storing_nodes), constants.k)
for node in storing_nodes:
self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEquals(map(lambda contact: (contact.id, contact.address, contact.port),
self.assertEqual(map(lambda contact: (contact.id, contact.address, contact.port),
node._dataStore.getStoringContacts()), [(announcing_node.node_id,
announcing_node.externalIP,
announcing_node.port)])
self.assertEquals(len(datastore_result), 1)
self.assertEqual(len(datastore_result), 1)
expanded_peers = []
for peer in datastore_result:
host = ".".join([str(ord(d)) for d in peer[:4]])
@ -40,7 +40,7 @@ class TestStoreExpiration(TestKademliaBase):
peer_node_id = peer[6:]
if (host, port, peer_node_id) not in expanded_peers:
expanded_peers.append((peer_node_id, host, port))
self.assertEquals(expanded_peers[0],
self.assertEqual(expanded_peers[0],
(announcing_node.node_id, announcing_node.externalIP, announcing_node.peerPort))
# verify the announced blob expires in the storing nodes datastores
@ -49,16 +49,16 @@ class TestStoreExpiration(TestKademliaBase):
for node in storing_nodes:
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEquals(len(datastore_result), 0)
self.assertEqual(len(datastore_result), 0)
self.assertTrue(blob_hash in node._dataStore._dict) # the looping call shouldn't have removed it yet
self.assertEquals(len(node._dataStore.getStoringContacts()), 1)
self.assertEqual(len(node._dataStore.getStoringContacts()), 1)
self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh)
for node in storing_nodes:
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEquals(len(datastore_result), 0)
self.assertEquals(len(node._dataStore.getStoringContacts()), 0)
self.assertEqual(len(datastore_result), 0)
self.assertEqual(len(node._dataStore.getStoringContacts()), 0)
self.assertTrue(blob_hash not in node._dataStore._dict) # the looping call should have fired
@defer.inlineCallbacks
@ -73,16 +73,16 @@ class TestStoreExpiration(TestKademliaBase):
# verify the nodes we think stored it did actually store it
storing_nodes = [node for node in all_nodes if node.node_id.encode('hex') in storing_node_ids]
self.assertEquals(len(storing_nodes), len(storing_node_ids))
self.assertEquals(len(storing_nodes), constants.k)
self.assertEqual(len(storing_nodes), len(storing_node_ids))
self.assertEqual(len(storing_nodes), constants.k)
for node in storing_nodes:
self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEquals(map(lambda contact: (contact.id, contact.address, contact.port),
self.assertEqual(map(lambda contact: (contact.id, contact.address, contact.port),
node._dataStore.getStoringContacts()), [(announcing_node.node_id,
announcing_node.externalIP,
announcing_node.port)])
self.assertEquals(len(datastore_result), 1)
self.assertEqual(len(datastore_result), 1)
expanded_peers = []
for peer in datastore_result:
host = ".".join([str(ord(d)) for d in peer[:4]])
@ -90,7 +90,7 @@ class TestStoreExpiration(TestKademliaBase):
peer_node_id = peer[6:]
if (host, port, peer_node_id) not in expanded_peers:
expanded_peers.append((peer_node_id, host, port))
self.assertEquals(expanded_peers[0],
self.assertEqual(expanded_peers[0],
(announcing_node.node_id, announcing_node.externalIP, announcing_node.peerPort))
self.pump_clock(constants.checkRefreshInterval*2)
@ -107,8 +107,8 @@ class TestStoreExpiration(TestKademliaBase):
for node in storing_nodes:
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEquals(len(datastore_result), 0)
self.assertEquals(len(node._dataStore.getStoringContacts()), 1)
self.assertEqual(len(datastore_result), 0)
self.assertEqual(len(node._dataStore.getStoringContacts()), 1)
self.assertTrue(blob_hash in node._dataStore._dict)
# # bring the announcing node back online
@ -123,8 +123,8 @@ class TestStoreExpiration(TestKademliaBase):
for node in storing_nodes:
self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEquals(len(datastore_result), 1)
self.assertEquals(len(node._dataStore.getStoringContacts()), 1)
self.assertEqual(len(datastore_result), 1)
self.assertEqual(len(node._dataStore.getStoringContacts()), 1)
self.assertTrue(blob_hash in node._dataStore._dict)
# verify the announced blob expires in the storing nodes datastores
@ -132,14 +132,14 @@ class TestStoreExpiration(TestKademliaBase):
for node in storing_nodes:
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEquals(len(datastore_result), 0)
self.assertEqual(len(datastore_result), 0)
self.assertTrue(blob_hash in node._dataStore._dict) # the looping call shouldn't have removed it yet
self.assertEquals(len(node._dataStore.getStoringContacts()), 1)
self.assertEqual(len(node._dataStore.getStoringContacts()), 1)
self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh)
for node in storing_nodes:
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEquals(len(datastore_result), 0)
self.assertEquals(len(node._dataStore.getStoringContacts()), 0)
self.assertEqual(len(datastore_result), 0)
self.assertEqual(len(node._dataStore.getStoringContacts()), 0)
self.assertTrue(blob_hash not in node._dataStore._dict) # the looping call should have fired

View file

@ -82,7 +82,7 @@ class AvailabilityWeightedStrategyTests(unittest.TestCase):
offer2 = strategy.make_offer(peer, blobs)
self.assertEquals(offer1.rate, 0.0)
self.assertEqual(offer1.rate, 0.0)
self.assertNotEqual(offer2.rate, 0.0)
def test_accept_zero_and_persist_if_accepted(self):
@ -101,13 +101,13 @@ class AvailabilityWeightedStrategyTests(unittest.TestCase):
response2 = host_strategy.respond_to_offer(offer, client, blobs)
client_strategy.update_accepted_offers(host, response2)
self.assertEquals(response1.is_too_low, False)
self.assertEquals(response1.is_accepted, True)
self.assertEquals(response1.rate, 0.0)
self.assertEqual(response1.is_too_low, False)
self.assertEqual(response1.is_accepted, True)
self.assertEqual(response1.rate, 0.0)
self.assertEquals(response2.is_too_low, False)
self.assertEquals(response2.is_accepted, True)
self.assertEquals(response2.rate, 0.0)
self.assertEqual(response2.is_too_low, False)
self.assertEqual(response2.is_accepted, True)
self.assertEqual(response2.rate, 0.0)
def test_how_many_turns_before_accept_with_similar_rate_settings(self):
base_rates = [0.0001 * n for n in range(1, 10)]

View file

@ -42,7 +42,7 @@ class TestLogger(trial.unittest.TestCase):
# traceback will depend on the system the test is being run on
# but hopefully these two tests are good enough
d = self.triggerErrback()
d.addCallback(lambda _: self.assertEquals(expected_first_line, output_lines()[0]))
d.addCallback(lambda _: self.assertEqual(expected_first_line, output_lines()[0]))
d.addCallback(lambda _: self.assertEqual(10, len(output_lines())))
return d

View file

@ -30,13 +30,13 @@ class ContactOperatorsTest(unittest.TestCase):
def testBoolean(self):
""" Test "equals" and "not equals" comparisons """
self.failIfEqual(
self.assertNotEqual(
self.firstContact, self.secondContact,
'Contacts with different IDs should not be equal.')
self.failUnlessEqual(
self.assertEqual(
self.firstContact, self.firstContactDifferentValues,
'Contacts with same IDs should be equal, even if their other values differ.')
self.failUnlessEqual(
self.assertEqual(
self.secondContact, self.secondContactCopy,
'Different copies of the same Contact instance should be equal')
@ -44,10 +44,10 @@ class ContactOperatorsTest(unittest.TestCase):
""" Test comparisons with non-Contact and non-str types """
msg = '"{}" operator: Contact object should not be equal to {} type'
for item in (123, [1, 2, 3], {'key': 'value'}):
self.failIfEqual(
self.assertNotEqual(
self.firstContact, item,
msg.format('eq', type(item).__name__))
self.failUnless(
self.assertTrue(
self.firstContact != item,
msg.format('ne', type(item).__name__))

View file

@ -29,7 +29,7 @@ class BencodeTest(unittest.TestCase):
""" Tests the bencode encoder """
for value, encodedValue in self.cases:
result = self.encoding.encode(value)
self.failUnlessEqual(
self.assertEqual(
result, encodedValue,
'Value "%s" not correctly encoded! Expected "%s", got "%s"' %
(value, encodedValue, result))
@ -38,10 +38,10 @@ class BencodeTest(unittest.TestCase):
""" Tests the bencode decoder """
for value, encodedValue in self.cases:
result = self.encoding.decode(encodedValue)
self.failUnlessEqual(
self.assertEqual(
result, value,
'Value "%s" not correctly decoded! Expected "%s", got "%s"' %
(encodedValue, value, result))
for encodedValue in self.badDecoderCases:
self.failUnlessRaises(
self.assertRaises(
lbrynet.dht.encoding.DecodeError, self.encoding.decode, encodedValue)

View file

@ -40,19 +40,19 @@ class KBucketTest(unittest.TestCase):
for i in range(constants.k):
tmpContact = self.contact_manager.make_contact(generate_id(), next(self.address_generator), 4444, 0, None)
self.kbucket.addContact(tmpContact)
self.failUnlessEqual(
self.assertEqual(
self.kbucket._contacts[i],
tmpContact,
"Contact in position %d not the same as the newly-added contact" % i)
# Test if contact is not added to full list
tmpContact = self.contact_manager.make_contact(generate_id(), next(self.address_generator), 4444, 0, None)
self.failUnlessRaises(kbucket.BucketFull, self.kbucket.addContact, tmpContact)
self.assertRaises(kbucket.BucketFull, self.kbucket.addContact, tmpContact)
# Test if an existing contact is updated correctly if added again
existingContact = self.kbucket._contacts[0]
self.kbucket.addContact(existingContact)
self.failUnlessEqual(
self.assertEqual(
self.kbucket._contacts.index(existingContact),
len(self.kbucket._contacts)-1,
'Contact not correctly updated; it should be at the end of the list of contacts')
@ -60,7 +60,7 @@ class KBucketTest(unittest.TestCase):
def testGetContacts(self):
# try and get 2 contacts from empty list
result = self.kbucket.getContacts(2)
self.failIf(len(result) != 0, "Returned list should be empty; returned list length: %d" %
self.assertFalse(len(result) != 0, "Returned list should be empty; returned list length: %d" %
(len(result)))
@ -83,36 +83,36 @@ class KBucketTest(unittest.TestCase):
# try to get too many contacts
# requested count greater than bucket size; should return at most k contacts
contacts = self.kbucket.getContacts(constants.k+3)
self.failUnless(len(contacts) <= constants.k,
self.assertTrue(len(contacts) <= constants.k,
'Returned list should not have more than k entries!')
# verify returned contacts in list
for node_id, i in zip(node_ids, range(constants.k-2)):
self.failIf(self.kbucket._contacts[i].id != node_id,
self.assertFalse(self.kbucket._contacts[i].id != node_id,
"Contact in position %s not same as added contact" % (str(i)))
# try to get too many contacts
# requested count one greater than number of contacts
if constants.k >= 2:
result = self.kbucket.getContacts(constants.k-1)
self.failIf(len(result) != constants.k-2,
self.assertFalse(len(result) != constants.k-2,
"Too many contacts in returned list %s - should be %s" %
(len(result), constants.k-2))
else:
result = self.kbucket.getContacts(constants.k-1)
# if the count is <= 0, it should return all of it's contats
self.failIf(len(result) != constants.k,
self.assertFalse(len(result) != constants.k,
"Too many contacts in returned list %s - should be %s" %
(len(result), constants.k-2))
result = self.kbucket.getContacts(constants.k-3)
self.failIf(len(result) != constants.k-3,
self.assertFalse(len(result) != constants.k-3,
"Too many contacts in returned list %s - should be %s" %
(len(result), constants.k-3))
def testRemoveContact(self):
# try remove contact from empty list
rmContact = self.contact_manager.make_contact(generate_id(), next(self.address_generator), 4444, 0, None)
self.failUnlessRaises(ValueError, self.kbucket.removeContact, rmContact)
self.assertRaises(ValueError, self.kbucket.removeContact, rmContact)
# Add couple contacts
for i in range(constants.k-2):
@ -122,4 +122,4 @@ class KBucketTest(unittest.TestCase):
# try remove contact from empty list
self.kbucket.addContact(rmContact)
result = self.kbucket.removeContact(rmContact)
self.failIf(rmContact in self.kbucket._contacts, "Could not remove contact from bucket")
self.assertFalse(rmContact in self.kbucket._contacts, "Could not remove contact from bucket")

View file

@ -51,7 +51,7 @@ class DefaultFormatTranslatorTest(unittest.TestCase):
'127.0.0.1', 1921)]})
)
self.translator = DefaultFormat()
self.failUnless(
self.assertTrue(
isinstance(self.translator, MessageTranslator),
'Translator class must inherit from entangled.kademlia.msgformat.MessageTranslator!')
@ -59,10 +59,10 @@ class DefaultFormatTranslatorTest(unittest.TestCase):
""" Tests translation from a Message object to a primitive """
for msg, msgPrimitive in self.cases:
translatedObj = self.translator.toPrimitive(msg)
self.failUnlessEqual(len(translatedObj), len(msgPrimitive),
self.assertEqual(len(translatedObj), len(msgPrimitive),
"Translated object does not match example object's size")
for key in msgPrimitive:
self.failUnlessEqual(
self.assertEqual(
translatedObj[key], msgPrimitive[key],
'Message object type %s not translated correctly into primitive on '
'key "%s"; expected "%s", got "%s"' %
@ -72,12 +72,12 @@ class DefaultFormatTranslatorTest(unittest.TestCase):
""" Tests translation from a primitive to a Message object """
for msg, msgPrimitive in self.cases:
translatedObj = self.translator.fromPrimitive(msgPrimitive)
self.failUnlessEqual(
self.assertEqual(
type(translatedObj), type(msg),
'Message type incorrectly translated; expected "%s", got "%s"' %
(type(msg), type(translatedObj)))
for key in msg.__dict__:
self.failUnlessEqual(
self.assertEqual(
msg.__dict__[key], translatedObj.__dict__[key],
'Message instance variable "%s" not translated correctly; '
'expected "%s", got "%s"' %

View file

@ -20,8 +20,8 @@ class NodeIDTest(unittest.TestCase):
def testAutoCreatedID(self):
""" Tests if a new node has a valid node ID """
self.failUnlessEqual(type(self.node.node_id), bytes, 'Node does not have a valid ID')
self.failUnlessEqual(len(self.node.node_id), 48, 'Node ID length is incorrect! '
self.assertEqual(type(self.node.node_id), bytes, 'Node does not have a valid ID')
self.assertEqual(len(self.node.node_id), 48, 'Node ID length is incorrect! '
'Expected 384 bits, got %d bits.' %
(len(self.node.node_id) * 8))
@ -31,7 +31,7 @@ class NodeIDTest(unittest.TestCase):
for i in range(100):
newID = self.node._generateID()
# ugly uniqueness test
self.failIf(newID in generatedIDs, 'Generated ID #%d not unique!' % (i+1))
self.assertFalse(newID in generatedIDs, 'Generated ID #%d not unique!' % (i+1))
generatedIDs.append(newID)
def testKeyLength(self):
@ -39,7 +39,7 @@ class NodeIDTest(unittest.TestCase):
for i in range(20):
id = self.node._generateID()
# Key length: 20 bytes == 160 bits
self.failUnlessEqual(len(id), 48,
self.assertEqual(len(id), 48,
'Length of generated ID is incorrect! Expected 384 bits, '
'got %d bits.' % (len(id)*8))
@ -68,9 +68,9 @@ class NodeDataTest(unittest.TestCase):
for key, value in self.cases:
expected_result = self.contact.compact_ip() + struct.pack('>H', value) + \
self.contact.id
self.failUnless(self.node._dataStore.hasPeersForBlob(key),
self.assertTrue(self.node._dataStore.hasPeersForBlob(key),
'Stored key not found in node\'s DataStore: "%s"' % key)
self.failUnless(expected_result in self.node._dataStore.getPeersForBlob(key),
self.assertTrue(expected_result in self.node._dataStore.getPeersForBlob(key),
'Stored val not found in node\'s DataStore: key:"%s" port:"%s" %s'
% (key, value, self.node._dataStore.getPeersForBlob(key)))
@ -92,9 +92,9 @@ class NodeContactTest(unittest.TestCase):
yield self.node.addContact(contact)
# ...and request the closest nodes to it using FIND_NODE
closestNodes = self.node._routingTable.findCloseNodes(contactID, constants.k)
self.failUnlessEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; '
self.assertEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; '
'expected 1, got %d' % len(closestNodes))
self.failUnless(contact in closestNodes, 'Added contact not found by issueing '
self.assertTrue(contact in closestNodes, 'Added contact not found by issueing '
'_findCloseNodes()')
@defer.inlineCallbacks
@ -107,7 +107,7 @@ class NodeContactTest(unittest.TestCase):
# ...and request the closest nodes to it using FIND_NODE
closestNodes = self.node._routingTable.findCloseNodes(self.node.node_id,
constants.k)
self.failIf(contact in closestNodes, 'Node added itself as a contact')
self.assertFalse(contact in closestNodes, 'Node added itself as a contact')
# class FakeRPCProtocol(protocol.DatagramProtocol):

View file

@ -36,7 +36,7 @@ class TreeRoutingTableTest(unittest.TestCase):
for test in basicTestList:
result = Distance(test[0])(test[1])
self.failIf(result != test[2], 'Result of _distance() should be %s but %s returned' %
self.assertFalse(result != test[2], 'Result of _distance() should be %s but %s returned' %
(test[2], result))
@defer.inlineCallbacks
@ -51,9 +51,9 @@ class TreeRoutingTableTest(unittest.TestCase):
yield self.routingTable.addContact(contact)
# ...and request the closest nodes to it (will retrieve it)
closestNodes = self.routingTable.findCloseNodes(contactID)
self.failUnlessEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; expected 1,'
self.assertEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; expected 1,'
' got %d' % len(closestNodes))
self.failUnless(contact in closestNodes, 'Added contact not found by issueing '
self.assertTrue(contact in closestNodes, 'Added contact not found by issueing '
'_findCloseNodes()')
@defer.inlineCallbacks
@ -67,7 +67,7 @@ class TreeRoutingTableTest(unittest.TestCase):
yield self.routingTable.addContact(contact)
# ...and get it again
sameContact = self.routingTable.getContact(contactID)
self.failUnlessEqual(contact, sameContact, 'getContact() should return the same contact')
self.assertEqual(contact, sameContact, 'getContact() should return the same contact')
@defer.inlineCallbacks
def testAddParentNodeAsContact(self):
@ -81,7 +81,7 @@ class TreeRoutingTableTest(unittest.TestCase):
yield self.routingTable.addContact(contact)
# ...and request the closest nodes to it using FIND_NODE
closestNodes = self.routingTable.findCloseNodes(self.nodeID, constants.k)
self.failIf(contact in closestNodes, 'Node added itself as a contact')
self.assertFalse(contact in closestNodes, 'Node added itself as a contact')
@defer.inlineCallbacks
def testRemoveContact(self):
@ -94,15 +94,15 @@ class TreeRoutingTableTest(unittest.TestCase):
# Now add it...
yield self.routingTable.addContact(contact)
# Verify addition
self.failUnlessEqual(len(self.routingTable._buckets[0]), 1, 'Contact not added properly')
self.assertEqual(len(self.routingTable._buckets[0]), 1, 'Contact not added properly')
# Now remove it
self.routingTable.removeContact(contact)
self.failUnlessEqual(len(self.routingTable._buckets[0]), 0, 'Contact not removed properly')
self.assertEqual(len(self.routingTable._buckets[0]), 0, 'Contact not removed properly')
@defer.inlineCallbacks
def testSplitBucket(self):
""" Tests if the the routing table correctly dynamically splits k-buckets """
self.failUnlessEqual(self.routingTable._buckets[0].rangeMax, 2**384,
self.assertEqual(self.routingTable._buckets[0].rangeMax, 2**384,
'Initial k-bucket range should be 0 <= range < 2**384')
# Add k contacts
for i in range(constants.k):
@ -111,7 +111,7 @@ class TreeRoutingTableTest(unittest.TestCase):
nodeID = h.digest()
contact = self.contact_manager.make_contact(nodeID, '127.0.0.1', 9182, self.protocol)
yield self.routingTable.addContact(contact)
self.failUnlessEqual(len(self.routingTable._buckets), 1,
self.assertEqual(len(self.routingTable._buckets), 1,
'Only k nodes have been added; the first k-bucket should now '
'be full, but should not yet be split')
# Now add 1 more contact
@ -120,15 +120,15 @@ class TreeRoutingTableTest(unittest.TestCase):
nodeID = h.digest()
contact = self.contact_manager.make_contact(nodeID, '127.0.0.1', 9182, self.protocol)
yield self.routingTable.addContact(contact)
self.failUnlessEqual(len(self.routingTable._buckets), 2,
self.assertEqual(len(self.routingTable._buckets), 2,
'k+1 nodes have been added; the first k-bucket should have been '
'split into two new buckets')
self.failIfEqual(self.routingTable._buckets[0].rangeMax, 2**384,
self.assertNotEqual(self.routingTable._buckets[0].rangeMax, 2**384,
'K-bucket was split, but its range was not properly adjusted')
self.failUnlessEqual(self.routingTable._buckets[1].rangeMax, 2**384,
self.assertEqual(self.routingTable._buckets[1].rangeMax, 2**384,
'K-bucket was split, but the second (new) bucket\'s '
'max range was not set properly')
self.failUnlessEqual(self.routingTable._buckets[0].rangeMax,
self.assertEqual(self.routingTable._buckets[0].rangeMax,
self.routingTable._buckets[1].rangeMin,
'K-bucket was split, but the min/max ranges were '
'not divided properly')
@ -159,9 +159,9 @@ class TreeRoutingTableTest(unittest.TestCase):
# self.assertEquals(nodeID, node_ids[i].decode('hex'))
contact = self.contact_manager.make_contact(unhexlify(nodeID), '127.0.0.1', 9182, self.protocol)
yield self.routingTable.addContact(contact)
self.failUnlessEqual(len(self.routingTable._buckets), 2)
self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), 8)
self.failUnlessEqual(len(self.routingTable._buckets[1]._contacts), 2)
self.assertEqual(len(self.routingTable._buckets), 2)
self.assertEqual(len(self.routingTable._buckets[0]._contacts), 8)
self.assertEqual(len(self.routingTable._buckets[1]._contacts), 2)
# try adding a contact who is further from us than the k'th known contact
nodeID = b'020000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000'
@ -169,11 +169,11 @@ class TreeRoutingTableTest(unittest.TestCase):
contact = self.contact_manager.make_contact(nodeID, '127.0.0.1', 9182, self.protocol)
self.assertFalse(self.routingTable._shouldSplit(self.routingTable._kbucketIndex(contact.id), contact.id))
yield self.routingTable.addContact(contact)
self.failUnlessEqual(len(self.routingTable._buckets), 2)
self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), 8)
self.failUnlessEqual(len(self.routingTable._buckets[1]._contacts), 2)
self.failIf(contact in self.routingTable._buckets[0]._contacts)
self.failIf(contact in self.routingTable._buckets[1]._contacts)
self.assertEqual(len(self.routingTable._buckets), 2)
self.assertEqual(len(self.routingTable._buckets[0]._contacts), 8)
self.assertEqual(len(self.routingTable._buckets[1]._contacts), 2)
self.assertFalse(contact in self.routingTable._buckets[0]._contacts)
self.assertFalse(contact in self.routingTable._buckets[1]._contacts)
# class KeyErrorFixedTest(unittest.TestCase):

View file

@ -96,7 +96,7 @@ class TestCostEst(unittest.TestCase):
correct_result = 4.5
daemon = get_test_daemon(generous=True, with_fee=True)
result = yield daemon.get_est_cost("test", size)
self.assertEquals(result, correct_result)
self.assertEqual(result, correct_result)
@defer.inlineCallbacks
def test_fee_and_ungenerous_data(self):
@ -106,7 +106,7 @@ class TestCostEst(unittest.TestCase):
correct_result = size / 10 ** 6 * data_rate + fake_fee_amount
daemon = get_test_daemon(generous=False, with_fee=True)
result = yield daemon.get_est_cost("test", size)
self.assertEquals(result, correct_result)
self.assertEqual(result, correct_result)
@defer.inlineCallbacks
def test_generous_data_and_no_fee(self):
@ -114,7 +114,7 @@ class TestCostEst(unittest.TestCase):
correct_result = 0.0
daemon = get_test_daemon(generous=True)
result = yield daemon.get_est_cost("test", size)
self.assertEquals(result, correct_result)
self.assertEqual(result, correct_result)
@defer.inlineCallbacks
def test_ungenerous_data_and_no_fee(self):
@ -123,7 +123,7 @@ class TestCostEst(unittest.TestCase):
correct_result = size / 10 ** 6 * data_rate
daemon = get_test_daemon(generous=False)
result = yield daemon.get_est_cost("test", size)
self.assertEquals(result, correct_result)
self.assertEqual(result, correct_result)
class TestJsonRpc(unittest.TestCase):
@ -174,37 +174,37 @@ class TestFileListSorting(unittest.TestCase):
sort_options = ['points_paid']
deferred = defer.maybeDeferred(self.test_daemon.jsonrpc_file_list, sort=sort_options)
file_list = self.successResultOf(deferred)
self.assertEquals(self.test_points_paid, [f['points_paid'] for f in file_list])
self.assertEqual(self.test_points_paid, [f['points_paid'] for f in file_list])
def test_sort_by_points_paid_ascending(self):
sort_options = ['points_paid,asc']
deferred = defer.maybeDeferred(self.test_daemon.jsonrpc_file_list, sort=sort_options)
file_list = self.successResultOf(deferred)
self.assertEquals(self.test_points_paid, [f['points_paid'] for f in file_list])
self.assertEqual(self.test_points_paid, [f['points_paid'] for f in file_list])
def test_sort_by_points_paid_descending(self):
sort_options = ['points_paid, desc']
deferred = defer.maybeDeferred(self.test_daemon.jsonrpc_file_list, sort=sort_options)
file_list = self.successResultOf(deferred)
self.assertEquals(list(reversed(self.test_points_paid)), [f['points_paid'] for f in file_list])
self.assertEqual(list(reversed(self.test_points_paid)), [f['points_paid'] for f in file_list])
def test_sort_by_file_name_no_direction_specified(self):
sort_options = ['file_name']
deferred = defer.maybeDeferred(self.test_daemon.jsonrpc_file_list, sort=sort_options)
file_list = self.successResultOf(deferred)
self.assertEquals(self.test_file_names, [f['file_name'] for f in file_list])
self.assertEqual(self.test_file_names, [f['file_name'] for f in file_list])
def test_sort_by_file_name_ascending(self):
sort_options = ['file_name,asc']
deferred = defer.maybeDeferred(self.test_daemon.jsonrpc_file_list, sort=sort_options)
file_list = self.successResultOf(deferred)
self.assertEquals(self.test_file_names, [f['file_name'] for f in file_list])
self.assertEqual(self.test_file_names, [f['file_name'] for f in file_list])
def test_sort_by_file_name_descending(self):
sort_options = ['file_name,desc']
deferred = defer.maybeDeferred(self.test_daemon.jsonrpc_file_list, sort=sort_options)
file_list = self.successResultOf(deferred)
self.assertEquals(list(reversed(self.test_file_names)), [f['file_name'] for f in file_list])
self.assertEqual(list(reversed(self.test_file_names)), [f['file_name'] for f in file_list])
def test_sort_by_multiple_criteria(self):
expected = [
@ -225,7 +225,7 @@ class TestFileListSorting(unittest.TestCase):
sort_options = ['file_name,asc', 'points_paid,desc']
deferred = defer.maybeDeferred(self.test_daemon.jsonrpc_file_list, sort=sort_options)
file_list = self.successResultOf(deferred)
self.assertEquals(expected, map(format_result, file_list))
self.assertEqual(expected, map(format_result, file_list))
# Check that the list is not sorted as expected when sorted only by file_name.
sort_options = ['file_name,asc']
@ -250,13 +250,13 @@ class TestFileListSorting(unittest.TestCase):
sort_options = ['metadata.author']
deferred = defer.maybeDeferred(self.test_daemon.jsonrpc_file_list, sort=sort_options)
file_list = self.successResultOf(deferred)
self.assertEquals(self.test_authors, extract_authors(file_list))
self.assertEqual(self.test_authors, extract_authors(file_list))
# Check that the list matches the expected in reverse when sorting in descending order.
sort_options = ['metadata.author,desc']
deferred = defer.maybeDeferred(self.test_daemon.jsonrpc_file_list, sort=sort_options)
file_list = self.successResultOf(deferred)
self.assertEquals(list(reversed(self.test_authors)), extract_authors(file_list))
self.assertEqual(list(reversed(self.test_authors)), extract_authors(file_list))
# Check that the list is not sorted as expected when not sorted at all.
deferred = defer.maybeDeferred(self.test_daemon.jsonrpc_file_list)
@ -269,14 +269,14 @@ class TestFileListSorting(unittest.TestCase):
failure_assertion = self.assertFailure(deferred, Exception)
exception = self.successResultOf(failure_assertion)
expected_message = 'Failed to get "meta.author", key "meta" was not found.'
self.assertEquals(expected_message, exception.message)
self.assertEqual(expected_message, exception.message)
sort_options = ['metadata.foo.bar']
deferred = defer.maybeDeferred(self.test_daemon.jsonrpc_file_list, sort=sort_options)
failure_assertion = self.assertFailure(deferred, Exception)
exception = self.successResultOf(failure_assertion)
expected_message = 'Failed to get "metadata.foo.bar", key "foo" was not found.'
self.assertEquals(expected_message, exception.message)
self.assertEqual(expected_message, exception.message)
def _get_fake_lbry_files(self):
return [self._get_fake_lbry_file() for _ in range(10)]