Upgrade unit tests to use more useful asserts

This commit is contained in:
Hugo 2018-10-18 14:41:33 +03:00 committed by Lex Berezhny
parent 2e6c848c39
commit d0a7ca841b
19 changed files with 80 additions and 80 deletions

View file

@ -30,7 +30,7 @@ class TestPeerExpiration(TestKademliaBase):
self.nodes + self._seeds))
self.assertRaises(AssertionError, self.verify_all_nodes_are_routable)
self.assertTrue(len(get_nodes_with_stale_contacts()) > 1)
self.assertGreater(len(get_nodes_with_stale_contacts()), 1)
# run the network long enough for two failures to happen
self.pump_clock(constants.checkRefreshInterval * 3)

View file

@ -178,21 +178,21 @@ class KademliaProtocolTest(unittest.TestCase):
self._reactor.advance(3)
find_value_response = yield d
self.assertEqual(self.remote_contact.protocolVersion, 0)
self.assertTrue('protocolVersion' not in find_value_response)
self.assertNotIn('protocolVersion', find_value_response)
self.remote_node.findValue = original_findvalue
d = self.remote_contact.findValue(fake_blob)
self._reactor.advance(3)
find_value_response = yield d
self.assertEqual(self.remote_contact.protocolVersion, 1)
self.assertTrue('protocolVersion' not in find_value_response)
self.assertNotIn('protocolVersion', find_value_response)
self.remote_node.findValue = findValue
d = self.remote_contact.findValue(fake_blob)
self._reactor.advance(3)
find_value_response = yield d
self.assertEqual(self.remote_contact.protocolVersion, 0)
self.assertTrue('protocolVersion' not in find_value_response)
self.assertNotIn('protocolVersion', find_value_response)
@defer.inlineCallbacks
def testStoreToPre_0_20_0_Node(self):
@ -213,7 +213,7 @@ class KademliaProtocolTest(unittest.TestCase):
@rpcmethod
def store(contact, key, value, originalPublisherID=None, self_store=False, **kwargs):
self.assertTrue(len(key) == 48)
self.assertEqual(len(key), 48)
self.assertSetEqual(set(value.keys()), {b'token', b'lbryid', b'port'})
self.assertFalse(self_store)
self.assertDictEqual(kwargs, {})
@ -230,7 +230,7 @@ class KademliaProtocolTest(unittest.TestCase):
self._reactor.advance(3)
find_value_response = yield d
self.assertEqual(self.remote_contact.protocolVersion, 0)
self.assertTrue(b'protocolVersion' not in find_value_response)
self.assertNotIn(b'protocolVersion', find_value_response)
token = find_value_response[b'token']
d = self.remote_contact.store(fake_blob, token, 3333, self.node.node_id, 0)
self._reactor.advance(3)
@ -256,7 +256,7 @@ class KademliaProtocolTest(unittest.TestCase):
self._reactor.advance(3)
find_value_response = yield d
self.assertEqual(self.remote_contact.protocolVersion, 0)
self.assertTrue(b'protocolVersion' not in find_value_response)
self.assertNotIn(b'protocolVersion', find_value_response)
token = find_value_response[b'token']
us_from_them.update_protocol_version(0)
d = self.remote_node._protocol.sendRPC(
@ -278,7 +278,7 @@ class KademliaProtocolTest(unittest.TestCase):
result = self.node.findContact(b'0'*48)
for _ in range(6):
self._reactor.advance(1)
self.assertEqual((yield result), None)
self.assertIsNone((yield result))
result = self.node.findContact(self.remote_contact.id)
for _ in range(6):
self._reactor.advance(1)

View file

@ -19,10 +19,10 @@ class TestFindNode(TestKademliaBase):
to_last_node = Distance(last_node_id)
for n in self.nodes:
find_close_nodes_result = n._routingTable.findCloseNodes(last_node_id, constants.k)
self.assertTrue(len(find_close_nodes_result) == constants.k)
self.assertEqual(len(find_close_nodes_result), constants.k)
found_ids = [c.id for c in find_close_nodes_result]
self.assertListEqual(found_ids, sorted(found_ids, key=lambda x: to_last_node(x)))
if last_node_id in [c.id for c in n.contacts]:
self.assertTrue(found_ids[0] == last_node_id)
self.assertEqual(found_ids[0], last_node_id)
else:
self.assertTrue(last_node_id not in found_ids)
self.assertNotIn(last_node_id, found_ids)

View file

@ -77,7 +77,7 @@ class TestStoreExpiration(TestKademliaBase):
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEqual(len(datastore_result), 0)
self.assertTrue(blob_hash in node._dataStore) # the looping call shouldn't have removed it yet
self.assertIn(blob_hash, node._dataStore) # the looping call shouldn't have removed it yet
self.assertEqual(len(node._dataStore.getStoringContacts()), 1)
self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh)
@ -86,7 +86,7 @@ class TestStoreExpiration(TestKademliaBase):
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEqual(len(datastore_result), 0)
self.assertEqual(len(node._dataStore.getStoringContacts()), 0)
self.assertTrue(blob_hash not in node._dataStore.keys()) # the looping call should have fired
self.assertNotIn(blob_hash, node._dataStore.keys()) # the looping call should have fired
@defer.inlineCallbacks
def test_storing_node_went_stale_then_came_back(self):
@ -136,7 +136,7 @@ class TestStoreExpiration(TestKademliaBase):
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEqual(len(datastore_result), 0)
self.assertEqual(len(node._dataStore.getStoringContacts()), 1)
self.assertTrue(blob_hash in node._dataStore)
self.assertIn(blob_hash, node._dataStore)
# # bring the announcing node back online
self.nodes.append(announcing_node)
@ -152,7 +152,7 @@ class TestStoreExpiration(TestKademliaBase):
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEqual(len(datastore_result), 1)
self.assertEqual(len(node._dataStore.getStoringContacts()), 1)
self.assertTrue(blob_hash in node._dataStore)
self.assertIn(blob_hash, node._dataStore)
# verify the announced blob expires in the storing nodes datastores
self.clock.advance(constants.dataExpireTimeout) # skip the clock directly ahead
@ -160,7 +160,7 @@ class TestStoreExpiration(TestKademliaBase):
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEqual(len(datastore_result), 0)
self.assertTrue(blob_hash in node._dataStore) # the looping call shouldn't have removed it yet
self.assertIn(blob_hash, node._dataStore) # the looping call shouldn't have removed it yet
self.assertEqual(len(node._dataStore.getStoringContacts()), 1)
self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh)
@ -169,4 +169,4 @@ class TestStoreExpiration(TestKademliaBase):
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEqual(len(datastore_result), 0)
self.assertEqual(len(node._dataStore.getStoringContacts()), 0)
self.assertTrue(blob_hash not in node._dataStore) # the looping call should have fired
self.assertNotIn(blob_hash, node._dataStore) # the looping call should have fired

View file

@ -7,7 +7,7 @@ class TrackTest(unittest.TestCase):
def test_empty_summarize_is_none(self):
track = analytics.Manager(None, 'x', 'y', 'z')
_, result = track.summarize_and_reset('a')
self.assertEqual(None, result)
self.assertIsNone(result)
def test_can_get_sum_of_metric(self):
track = analytics.Manager(None, 'x', 'y', 'z')
@ -24,4 +24,4 @@ class TrackTest(unittest.TestCase):
track.summarize_and_reset('metric')
_, result = track.summarize_and_reset('metric')
self.assertEqual(None, result)
self.assertIsNone(result)

View file

@ -228,7 +228,7 @@ class TestIntegrationConnectionManager(TestCase):
self.assertEqual(0, self.TEST_PEER.success_count)
self.assertEqual(1, self.TEST_PEER.down_count)
self.assertEqual(0, self.connection_manager.num_peer_connections())
self.assertEqual(None, self.connection_manager._next_manage_call)
self.assertIsNone(self.connection_manager._next_manage_call)
@defer.inlineCallbacks
def test_closed_connection_when_server_is_slow(self):
@ -242,7 +242,7 @@ class TestIntegrationConnectionManager(TestCase):
connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].\
factory.connection_was_made_deferred
self.assertEqual(0, self.connection_manager.num_peer_connections())
self.assertEqual(True, connection_made)
self.assertTrue(connection_made)
self.assertEqual(0, self.TEST_PEER.success_count)
self.assertEqual(1, self.TEST_PEER.down_count)

View file

@ -113,7 +113,7 @@ class TestBlobRequestHandlerSender(unittest.TestCase):
handler = BlobRequestHandler.BlobRequestHandler(None, None, None, None)
handler.currently_uploading = None
deferred = handler.send_blob_if_requested(None)
self.assertEqual(True, self.successResultOf(deferred))
self.assertTrue(self.successResultOf(deferred))
def test_file_is_sent_to_consumer(self):
# TODO: also check that the expected payment values are set

View file

@ -54,7 +54,7 @@ class BlobManagerTest(unittest.TestCase):
# check to see if blob is there
self.assertTrue(os.path.isfile(os.path.join(self.blob_dir, blob_hash)))
blobs = yield self.bm.get_all_verified_blobs()
self.assertTrue(blob_hash in blobs)
self.assertIn(blob_hash, blobs)
defer.returnValue(blob_hash)
@defer.inlineCallbacks
@ -82,7 +82,7 @@ class BlobManagerTest(unittest.TestCase):
self.assertEqual(len(blobs), 0)
blobs = yield self.bm.storage.get_all_blob_hashes()
self.assertEqual(len(blobs), 0)
self.assertFalse(blob_hash in self.bm.blobs)
self.assertNotIn(blob_hash, self.bm.blobs)
# delete blob that was already deleted once
yield self.bm.delete_blobs([blob_hash])
@ -116,7 +116,7 @@ class BlobManagerTest(unittest.TestCase):
yield self.bm.delete_blobs([blob_hash])
blobs = yield self.bm.get_all_verified_blobs()
self.assertEqual(len(blobs), 10)
self.assertTrue(blob_hashes[-1] in blobs)
self.assertIn(blob_hashes[-1], blobs)
self.assertTrue(os.path.isfile(os.path.join(self.blob_dir, blob_hashes[-1])))
@defer.inlineCallbacks

View file

@ -42,7 +42,7 @@ class HTTPBlobDownloaderTest(unittest.TestCase):
self.client.collect.side_effect = bad_collect
yield self.downloader.start()
self.assertEqual(self.blob.get_length(), self.response.length)
self.assertEqual(self.blob.get_is_verified(), False)
self.assertFalse(self.blob.get_is_verified())
self.assertEqual(self.blob.writers, {})
@defer.inlineCallbacks
@ -62,7 +62,7 @@ class HTTPBlobDownloaderTest(unittest.TestCase):
self.assertEqual(len(self.client.collect.mock_calls), self.downloader.max_failures)
self.blob_manager.get_blob.assert_called_with(self.blob_hash)
self.assertEqual(self.blob.get_length(), self.response.length)
self.assertEqual(self.blob.get_is_verified(), False)
self.assertFalse(self.blob.get_is_verified())
self.assertEqual(self.blob.writers, {})
@defer.inlineCallbacks
@ -72,7 +72,7 @@ class HTTPBlobDownloaderTest(unittest.TestCase):
self.blob_manager.get_blob.assert_called_with(self.blob_hash)
self.client.get.assert_called_with('http://{}/{}'.format('server1', self.blob_hash))
self.client.collect.assert_not_called()
self.assertEqual(self.blob.get_is_verified(), False)
self.assertFalse(self.blob.get_is_verified())
self.assertEqual(self.blob.writers, {})
def test_stop(self):
@ -83,7 +83,7 @@ class HTTPBlobDownloaderTest(unittest.TestCase):
self.client.get.assert_called_with('http://{}/{}'.format('server1', self.blob_hash))
self.client.collect.assert_called()
self.assertEqual(self.blob.get_length(), self.response.length)
self.assertEqual(self.blob.get_is_verified(), False)
self.assertFalse(self.blob.get_is_verified())
self.assertEqual(self.blob.writers, {})

View file

@ -27,7 +27,7 @@ class BlobFileTest(unittest.TestCase):
writer.write(self.fake_content)
writer.close()
out = yield finished_d
self.assertTrue(isinstance(out, BlobFile))
self.assertIsInstance(out, BlobFile)
self.assertTrue(out.verified)
self.assertEqual(self.fake_content_len, out.get_length())
@ -119,7 +119,7 @@ class BlobFileTest(unittest.TestCase):
# file should not exist, since we did not finish write
blob_file_2 = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
out = blob_file_2.open_for_reading()
self.assertEqual(None, out)
self.assertIsNone(out)
@defer.inlineCallbacks
def test_multiple_writers(self):
@ -134,7 +134,7 @@ class BlobFileTest(unittest.TestCase):
out_2 = yield finished_d_2
out_1 = yield self.assertFailure(finished_d_1, DownloadCanceledError)
self.assertTrue(isinstance(out_2, BlobFile))
self.assertIsInstance(out_2, BlobFile)
self.assertTrue(out_2.verified)
self.assertEqual(self.fake_content_len, out_2.get_length())

View file

@ -101,12 +101,12 @@ class AvailabilityWeightedStrategyTests(unittest.TestCase):
response2 = host_strategy.respond_to_offer(offer, client, blobs)
client_strategy.update_accepted_offers(host, response2)
self.assertEqual(response1.is_too_low, False)
self.assertEqual(response1.is_accepted, True)
self.assertFalse(response1.is_too_low)
self.assertTrue(response1.is_accepted)
self.assertEqual(response1.rate, 0.0)
self.assertEqual(response2.is_too_low, False)
self.assertEqual(response2.is_accepted, True)
self.assertFalse(response2.is_too_low)
self.assertTrue(response2.is_accepted)
self.assertEqual(response2.rate, 0.0)
def test_how_many_turns_before_accept_with_similar_rate_settings(self):

View file

@ -123,7 +123,7 @@ class WalletTest(unittest.TestCase):
self.wallet._send_name_claim = success_send_name_claim
claim_out = yield self.wallet.claim_name('test', 1, test_claim_dict)
self.assertTrue('success' not in claim_out)
self.assertNotIn('success', claim_out)
self.assertEqual(expected_claim_out['claim_id'], claim_out['claim_id'])
self.assertEqual(expected_claim_out['fee'], claim_out['fee'])
self.assertEqual(expected_claim_out['nout'], claim_out['nout'])
@ -218,7 +218,7 @@ class WalletTest(unittest.TestCase):
self.assertEqual(3, self.wallet.get_balance())
# test failed point reservation
out = yield self.wallet.reserve_points('testid', 4)
self.assertEqual(None, out)
self.assertIsNone(out)
def test_point_reservation_and_claim(self):
# check that claims take into consideration point reservations

View file

@ -241,7 +241,7 @@ class FileStorageTests(StorageTest):
file_name = 'encrypted_file_saver_test.tmp'
self.assertFalse(os.path.isfile(file_name))
written_to = yield open_file_for_writing(self.db_dir, file_name)
self.assertTrue(written_to == file_name)
self.assertEqual(written_to, file_name)
self.assertTrue(os.path.isfile(os.path.join(self.db_dir, file_name)))
@defer.inlineCallbacks

View file

@ -28,8 +28,8 @@ class ContactTest(unittest.TestCase):
ValueError, self.contact_manager.make_contact, b'not valid node id', '192.168.1.20.1', 1000, None)
def test_no_duplicate_contact_objects(self):
self.assertTrue(self.second_contact is self.second_contact_second_reference)
self.assertTrue(self.first_contact is not self.first_contact_different_values)
self.assertIs(self.second_contact, self.second_contact_second_reference)
self.assertIsNot(self.first_contact, self.first_contact_different_values)
def test_boolean(self):
""" Test "equals" and "not equals" comparisons """
@ -65,68 +65,68 @@ class TestContactLastReplied(unittest.TestCase):
self.contact_manager = ContactManager(self.clock.seconds)
self.contact = self.contact_manager.make_contact(generate_id(), "127.0.0.1", 4444, None)
self.clock.advance(3600)
self.assertTrue(self.contact.contact_is_good is None)
self.assertIsNone(self.contact.contact_is_good)
def test_stale_replied_to_us(self):
self.contact.update_last_replied()
self.assertTrue(self.contact.contact_is_good is True)
self.assertIs(self.contact.contact_is_good, True)
def test_stale_requested_from_us(self):
self.contact.update_last_requested()
self.assertTrue(self.contact.contact_is_good is None)
self.assertIsNone(self.contact.contact_is_good)
def test_stale_then_fail(self):
self.contact.update_last_failed()
self.assertTrue(self.contact.contact_is_good is None)
self.assertIsNone(self.contact.contact_is_good)
self.clock.advance(1)
self.contact.update_last_failed()
self.assertTrue(self.contact.contact_is_good is False)
self.assertIs(self.contact.contact_is_good, False)
def test_good_turned_stale(self):
self.contact.update_last_replied()
self.assertTrue(self.contact.contact_is_good is True)
self.assertIs(self.contact.contact_is_good, True)
self.clock.advance(constants.checkRefreshInterval - 1)
self.assertTrue(self.contact.contact_is_good is True)
self.assertIs(self.contact.contact_is_good, True)
self.clock.advance(1)
self.assertTrue(self.contact.contact_is_good is None)
self.assertIsNone(self.contact.contact_is_good)
def test_good_then_fail(self):
self.contact.update_last_replied()
self.assertTrue(self.contact.contact_is_good is True)
self.assertIs(self.contact.contact_is_good, True)
self.clock.advance(1)
self.contact.update_last_failed()
self.assertTrue(self.contact.contact_is_good is True)
self.assertIs(self.contact.contact_is_good, True)
self.clock.advance(59)
self.assertTrue(self.contact.contact_is_good is True)
self.assertIs(self.contact.contact_is_good, True)
self.contact.update_last_failed()
self.assertTrue(self.contact.contact_is_good is False)
self.assertIs(self.contact.contact_is_good, False)
for _ in range(7200):
self.clock.advance(60)
self.assertTrue(self.contact.contact_is_good is False)
self.assertIs(self.contact.contact_is_good, False)
def test_good_then_fail_then_good(self):
# it replies
self.contact.update_last_replied()
self.assertTrue(self.contact.contact_is_good is True)
self.assertIs(self.contact.contact_is_good, True)
self.clock.advance(1)
# it fails twice in a row
self.contact.update_last_failed()
self.clock.advance(1)
self.contact.update_last_failed()
self.assertTrue(self.contact.contact_is_good is False)
self.assertIs(self.contact.contact_is_good, False)
self.clock.advance(1)
# it replies
self.contact.update_last_replied()
self.clock.advance(1)
self.assertTrue(self.contact.contact_is_good is True)
self.assertIs(self.contact.contact_is_good, True)
# it goes stale
self.clock.advance(constants.checkRefreshInterval - 2)
self.assertTrue(self.contact.contact_is_good is True)
self.assertIs(self.contact.contact_is_good, True)
self.clock.advance(1)
self.assertTrue(self.contact.contact_is_good is None)
self.assertIsNone(self.contact.contact_is_good)
class TestContactLastRequested(unittest.TestCase):
@ -137,39 +137,39 @@ class TestContactLastRequested(unittest.TestCase):
self.clock.advance(1)
self.contact.update_last_replied()
self.clock.advance(3600)
self.assertTrue(self.contact.contact_is_good is None)
self.assertIsNone(self.contact.contact_is_good)
def test_previous_replied_then_requested(self):
# it requests
self.contact.update_last_requested()
self.assertTrue(self.contact.contact_is_good is True)
self.assertIs(self.contact.contact_is_good, True)
# it goes stale
self.clock.advance(constants.checkRefreshInterval - 1)
self.assertTrue(self.contact.contact_is_good is True)
self.assertIs(self.contact.contact_is_good, True)
self.clock.advance(1)
self.assertTrue(self.contact.contact_is_good is None)
self.assertIsNone(self.contact.contact_is_good)
def test_previous_replied_then_requested_then_failed(self):
# it requests
self.contact.update_last_requested()
self.assertTrue(self.contact.contact_is_good is True)
self.assertIs(self.contact.contact_is_good, True)
self.clock.advance(1)
# it fails twice in a row
self.contact.update_last_failed()
self.clock.advance(1)
self.contact.update_last_failed()
self.assertTrue(self.contact.contact_is_good is False)
self.assertIs(self.contact.contact_is_good, False)
self.clock.advance(1)
# it requests
self.contact.update_last_requested()
self.clock.advance(1)
self.assertTrue(self.contact.contact_is_good is False)
self.assertIs(self.contact.contact_is_good, False)
# it goes stale
self.clock.advance((constants.refreshTimeout / 4) - 2)
self.assertTrue(self.contact.contact_is_good is False)
self.assertIs(self.contact.contact_is_good, False)
self.clock.advance(1)
self.assertTrue(self.contact.contact_is_good is False)
self.assertIs(self.contact.contact_is_good, False)

View file

@ -122,4 +122,4 @@ class KBucketTest(unittest.TestCase):
# try remove contact from empty list
self.kbucket.addContact(rmContact)
result = self.kbucket.removeContact(rmContact)
self.assertFalse(rmContact in self.kbucket._contacts, "Could not remove contact from bucket")
self.assertNotIn(rmContact, self.kbucket._contacts, "Could not remove contact from bucket")

View file

@ -53,7 +53,7 @@ class NodeDataTest(unittest.TestCase):
expected_result = self.contact.compact_ip() + struct.pack('>H', value) + self.contact.id
self.assertTrue(self.node._dataStore.hasPeersForBlob(key),
"Stored key not found in node's DataStore: '%s'" % key)
self.assertTrue(expected_result in self.node._dataStore.getPeersForBlob(key),
self.assertIn(expected_result, self.node._dataStore.getPeersForBlob(key),
"Stored val not found in node's DataStore: key:'%s' port:'%s' %s"
% (key, value, self.node._dataStore.getPeersForBlob(key)))

View file

@ -151,8 +151,8 @@ class TreeRoutingTableTest(unittest.TestCase):
self.assertEqual(len(self.routingTable._buckets), 2)
self.assertEqual(len(self.routingTable._buckets[0]._contacts), 8)
self.assertEqual(len(self.routingTable._buckets[1]._contacts), 2)
self.assertFalse(contact in self.routingTable._buckets[0]._contacts)
self.assertFalse(contact in self.routingTable._buckets[1]._contacts)
self.assertNotIn(contact, self.routingTable._buckets[0]._contacts)
self.assertNotIn(contact, self.routingTable._buckets[1]._contacts)
# class KeyErrorFixedTest(unittest.TestCase):

View file

@ -20,13 +20,13 @@ class CLITest(unittest.TestCase):
normalize_value('VdNmakxFORPSyfCprAD/eDDPk5TY9QYtSA==')
)
self.assertEqual(True, normalize_value('TRUE'))
self.assertEqual(True, normalize_value('true'))
self.assertEqual(True, normalize_value('TrUe'))
self.assertEqual(False, normalize_value('FALSE'))
self.assertEqual(False, normalize_value('false'))
self.assertEqual(False, normalize_value('FaLsE'))
self.assertEqual(True, normalize_value(True))
self.assertTrue(normalize_value('TRUE'))
self.assertTrue(normalize_value('true'))
self.assertTrue(normalize_value('TrUe'))
self.assertFalse(normalize_value('FALSE'))
self.assertFalse(normalize_value('false'))
self.assertFalse(normalize_value('FaLsE'))
self.assertTrue(normalize_value(True))
self.assertEqual('3', normalize_value('3', key="uri"))
self.assertEqual('0.3', normalize_value('0.3', key="uri"))

View file

@ -40,4 +40,4 @@ class ClaimProofsTestCase(unittest.TestCase):
]
}
out = verify_proof(proof, hexlify(root_hash[::-1]), 'a')
self.assertEqual(out, True)
self.assertTrue(out)