lbry-sdk/tests/functional/dht/test_store.py

173 lines
8.8 KiB
Python
Raw Normal View History

2018-05-24 10:23:22 -04:00
import struct
2018-07-21 18:19:30 -03:00
from binascii import hexlify
2018-05-24 10:23:22 -04:00
from twisted.internet import defer
from lbrynet.dht import constants
2018-11-07 16:12:11 -05:00
from lbrynet.utils import generate_id
2018-07-21 18:19:30 -03:00
from .dht_test_environment import TestKademliaBase
2018-05-24 10:23:22 -04:00
import logging
log = logging.getLogger()
2018-05-29 16:22:30 -04:00
class TestStoreExpiration(TestKademliaBase):
2018-05-24 10:23:22 -04:00
network_size = 40
2018-09-28 13:47:37 -03:00
@defer.inlineCallbacks
def test_nullify_token(self):
blob_hash = generate_id(1)
announcing_node = self.nodes[20]
# announce the blob
announce_d = announcing_node.announceHaveBlob(blob_hash)
self.pump_clock(5+1)
storing_node_ids = yield announce_d
self.assertEqual(len(storing_node_ids), 8)
for node in set(self.nodes).union(set(self._seeds)):
# now, everyone has the wrong token
node.change_token()
node.change_token()
announce_d = announcing_node.announceHaveBlob(blob_hash)
self.pump_clock(5+1)
storing_node_ids = yield announce_d
2018-10-18 14:40:37 +03:00
self.assertEqual(len(storing_node_ids), 0) # can't store, wrong tokens, but they get nullified
2018-09-28 13:47:37 -03:00
announce_d = announcing_node.announceHaveBlob(blob_hash)
self.pump_clock(5+1)
storing_node_ids = yield announce_d
self.assertEqual(len(storing_node_ids), 8) # next attempt succeeds as it refreshes tokens
2018-05-24 10:23:22 -04:00
@defer.inlineCallbacks
def test_store_and_expire(self):
2018-05-29 16:22:30 -04:00
blob_hash = generate_id(1)
2018-05-24 10:23:22 -04:00
announcing_node = self.nodes[20]
# announce the blob
announce_d = announcing_node.announceHaveBlob(blob_hash)
self.pump_clock(5+1)
2018-05-24 10:23:22 -04:00
storing_node_ids = yield announce_d
all_nodes = set(self.nodes).union(set(self._seeds))
# verify the nodes we think stored it did actually store it
2018-07-21 18:19:30 -03:00
storing_nodes = [node for node in all_nodes if hexlify(node.node_id) in storing_node_ids]
2018-07-21 16:55:43 -04:00
self.assertEqual(len(storing_nodes), len(storing_node_ids))
self.assertEqual(len(storing_nodes), constants.k)
2018-05-24 10:23:22 -04:00
for node in storing_nodes:
self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEqual(list(map(lambda contact: (contact.id, contact.address, contact.port),
node._dataStore.getStoringContacts())), [(announcing_node.node_id,
announcing_node.externalIP,
announcing_node.port)])
2018-07-21 16:55:43 -04:00
self.assertEqual(len(datastore_result), 1)
2018-05-24 10:23:22 -04:00
expanded_peers = []
for peer in datastore_result:
2018-07-21 18:19:30 -03:00
host = ".".join([str(d) for d in peer[:4]])
2018-05-24 10:23:22 -04:00
port, = struct.unpack('>H', peer[4:6])
peer_node_id = peer[6:]
if (host, port, peer_node_id) not in expanded_peers:
expanded_peers.append((peer_node_id, host, port))
2018-07-21 16:55:43 -04:00
self.assertEqual(expanded_peers[0],
2018-05-24 10:23:22 -04:00
(announcing_node.node_id, announcing_node.externalIP, announcing_node.peerPort))
# verify the announced blob expires in the storing nodes datastores
self.clock.advance(constants.dataExpireTimeout) # skip the clock directly ahead
for node in storing_nodes:
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
2018-07-21 16:55:43 -04:00
self.assertEqual(len(datastore_result), 0)
self.assertIn(blob_hash, node._dataStore) # the looping call shouldn't have removed it yet
2018-07-21 16:55:43 -04:00
self.assertEqual(len(node._dataStore.getStoringContacts()), 1)
2018-05-24 10:23:22 -04:00
self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh)
for node in storing_nodes:
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
2018-07-21 16:55:43 -04:00
self.assertEqual(len(datastore_result), 0)
self.assertEqual(len(node._dataStore.getStoringContacts()), 0)
self.assertNotIn(blob_hash, node._dataStore.keys()) # the looping call should have fired
@defer.inlineCallbacks
def test_storing_node_went_stale_then_came_back(self):
2018-05-29 16:22:30 -04:00
blob_hash = generate_id(1)
announcing_node = self.nodes[20]
# announce the blob
announce_d = announcing_node.announceHaveBlob(blob_hash)
self.pump_clock(5+1)
storing_node_ids = yield announce_d
all_nodes = set(self.nodes).union(set(self._seeds))
# verify the nodes we think stored it did actually store it
storing_nodes = [node for node in all_nodes if hexlify(node.node_id) in storing_node_ids]
2018-07-21 16:55:43 -04:00
self.assertEqual(len(storing_nodes), len(storing_node_ids))
self.assertEqual(len(storing_nodes), constants.k)
for node in storing_nodes:
self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEqual(list(map(lambda contact: (contact.id, contact.address, contact.port),
node._dataStore.getStoringContacts())), [(announcing_node.node_id,
announcing_node.externalIP,
announcing_node.port)])
2018-07-21 16:55:43 -04:00
self.assertEqual(len(datastore_result), 1)
expanded_peers = []
for peer in datastore_result:
2018-07-21 18:19:30 -03:00
host = ".".join([str(d) for d in peer[:4]])
port, = struct.unpack('>H', peer[4:6])
peer_node_id = peer[6:]
if (host, port, peer_node_id) not in expanded_peers:
expanded_peers.append((peer_node_id, host, port))
2018-07-21 16:55:43 -04:00
self.assertEqual(expanded_peers[0],
(announcing_node.node_id, announcing_node.externalIP, announcing_node.peerPort))
2018-05-29 16:22:30 -04:00
self.pump_clock(constants.checkRefreshInterval*2)
2018-05-29 16:22:30 -04:00
# stop the node
self.nodes.remove(announcing_node)
yield self.run_reactor(31, [announcing_node.stop()])
# run the network for an hour, which should expire the removed node and turn the announced value stale
self.pump_clock(constants.checkRefreshInterval * 5, constants.checkRefreshInterval/2)
self.verify_all_nodes_are_routable()
2018-05-29 16:22:30 -04:00
# make sure the contact isn't returned as a peer for the blob, but that we still have the entry in the
# datastore in case the node comes back
for node in storing_nodes:
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
2018-07-21 16:55:43 -04:00
self.assertEqual(len(datastore_result), 0)
self.assertEqual(len(node._dataStore.getStoringContacts()), 1)
self.assertIn(blob_hash, node._dataStore)
2018-05-29 16:22:30 -04:00
# # bring the announcing node back online
self.nodes.append(announcing_node)
yield self.run_reactor(
31, [announcing_node.start([(seed_name, 4444) for seed_name in sorted(self.seed_dns.keys())])]
)
2018-05-29 16:22:30 -04:00
self.pump_clock(constants.checkRefreshInterval * 2)
self.verify_all_nodes_are_routable()
# now the announcing node should once again be returned as a peer for the blob
for node in storing_nodes:
self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
2018-07-21 16:55:43 -04:00
self.assertEqual(len(datastore_result), 1)
self.assertEqual(len(node._dataStore.getStoringContacts()), 1)
self.assertIn(blob_hash, node._dataStore)
2018-05-29 16:22:30 -04:00
# verify the announced blob expires in the storing nodes datastores
self.clock.advance(constants.dataExpireTimeout) # skip the clock directly ahead
for node in storing_nodes:
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
2018-07-21 16:55:43 -04:00
self.assertEqual(len(datastore_result), 0)
self.assertIn(blob_hash, node._dataStore) # the looping call shouldn't have removed it yet
2018-07-21 16:55:43 -04:00
self.assertEqual(len(node._dataStore.getStoringContacts()), 1)
2018-05-29 16:22:30 -04:00
self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh)
for node in storing_nodes:
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
2018-07-21 16:55:43 -04:00
self.assertEqual(len(datastore_result), 0)
self.assertEqual(len(node._dataStore.getStoringContacts()), 0)
self.assertNotIn(blob_hash, node._dataStore) # the looping call should have fired