omit bad contacts from getPeersForBlob

-refresh stored contacts to detect when they go offline or come back
This commit is contained in:
Jack Robison 2018-05-24 15:52:37 -04:00
parent 98e21cdba0
commit 07f92014d7
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 198 additions and 27 deletions

View file

@ -1,6 +1,15 @@
import ipaddress
from lbrynet.dht import constants from lbrynet.dht import constants
def is_valid_ipv4(address):
try:
ip = ipaddress.ip_address(address.decode()) # this needs to be unicode, thus the decode()
return ip.version == 4
except ipaddress.AddressValueError:
return False
class _Contact(object): class _Contact(object):
""" Encapsulation for remote contact """ Encapsulation for remote contact
@ -9,11 +18,15 @@ class _Contact(object):
""" """
def __init__(self, contactManager, id, ipAddress, udpPort, networkProtocol, firstComm): def __init__(self, contactManager, id, ipAddress, udpPort, networkProtocol, firstComm):
self._contactManager = contactManager
self._id = id
if id is not None: if id is not None:
if not len(id) == constants.key_bits / 8: if not len(id) == constants.key_bits / 8:
raise ValueError("invalid node id: %s" % id.encode('hex')) raise ValueError("invalid node id: %s" % id.encode('hex'))
if not 0 <= udpPort <= 65536:
raise ValueError("invalid port")
if not is_valid_ipv4(ipAddress):
raise ValueError("invalid ip address")
self._contactManager = contactManager
self._id = id
self.address = ipAddress self.address = ipAddress
self.port = udpPort self.port = udpPort
self._networkProtocol = networkProtocol self._networkProtocol = networkProtocol

View file

@ -21,36 +21,54 @@ class DictDataStore(UserDict.DictMixin):
""" Return a list of the keys in this data store """ """ Return a list of the keys in this data store """
return self._dict.keys() return self._dict.keys()
def filter_bad_and_expired_peers(self, key):
"""
Returns only non-expired and unknown/good peers
"""
return filter(
lambda peer:
self._getTime() - peer[3] < constants.dataExpireTimeout and peer[0].contact_is_good is not False,
self._dict[key]
)
def filter_expired_peers(self, key):
"""
Returns only non-expired peers
"""
return filter(lambda peer: self._getTime() - peer[2] < constants.dataExpireTimeout, self._dict[key])
def removeExpiredPeers(self): def removeExpiredPeers(self):
now = int(self._getTime())
for key in self._dict.keys(): for key in self._dict.keys():
unexpired_peers = filter(lambda peer: now - peer[2] < constants.dataExpireTimeout, self._dict[key]) unexpired_peers = self.filter_expired_peers(key)
if not unexpired_peers: if not unexpired_peers:
del self._dict[key] del self._dict[key]
else: else:
self._dict[key] = unexpired_peers self._dict[key] = unexpired_peers
def hasPeersForBlob(self, key): def hasPeersForBlob(self, key):
if key in self._dict and len(filter(lambda peer: self._getTime() - peer[2] < constants.dataExpireTimeout, if key in self._dict and len(self.filter_bad_and_expired_peers(key)):
self._dict[key])):
return True return True
return False return False
def addPeerToBlob(self, key, value, lastPublished, originallyPublished, originalPublisherID): def addPeerToBlob(self, contact, key, compact_address, lastPublished, originallyPublished, originalPublisherID):
if key in self._dict: if key in self._dict:
if value not in map(lambda store_tuple: store_tuple[0], self._dict[key]): if compact_address not in map(lambda store_tuple: store_tuple[1], self._dict[key]):
self._dict[key].append((value, lastPublished, originallyPublished, originalPublisherID)) self._dict[key].append((contact, compact_address, lastPublished, originallyPublished, originalPublisherID))
else: else:
self._dict[key] = [(value, lastPublished, originallyPublished, originalPublisherID)] self._dict[key] = [(contact, compact_address, lastPublished, originallyPublished, originalPublisherID)]
def getPeersForBlob(self, key): def getPeersForBlob(self, key):
return [] if key not in self._dict else [ return [] if key not in self._dict else [val[1] for val in self.filter_bad_and_expired_peers(key)]
val[0] for val in filter(lambda peer: self._getTime() - peer[2] < constants.dataExpireTimeout,
self._dict[key])
]
def removePeer(self, value): def removePeer(self, value):
for key in self._dict: for key in self._dict:
self._dict[key] = [val for val in self._dict[key] if val[0] != value] self._dict[key] = [val for val in self._dict[key] if val[1] != value]
if not self._dict[key]: if not self._dict[key]:
del self._dict[key] del self._dict[key]
def getStoringContacts(self):
contacts = set()
for key in self._dict:
for values in self._dict[key]:
contacts.add(values[0])
return list(contacts)

View file

@ -506,15 +506,20 @@ class Node(MockKademliaHelper):
@rpcmethod @rpcmethod
def store(self, rpc_contact, blob_hash, token, port, originalPublisherID=None, age=0): def store(self, rpc_contact, blob_hash, token, port, originalPublisherID=None, age=0):
""" Store the received data in this node's local hash table """ Store the received data in this node's local datastore
@param blob_hash: The hashtable key of the data @param blob_hash: The hash of the data
@type blob_hash: str @type blob_hash: str
@param value: The actual data (the value associated with C{key})
@type value: str @param token: The token we previously returned when this contact sent us a findValue
@param originalPublisherID: The node ID of the node that is the @type token: str
B{original} publisher of the data
@param port: The TCP port the contact is listening on for requests for this blob (the peerPort)
@type port: int
@param originalPublisherID: The node ID of the node that is the publisher of the data
@type originalPublisherID: str @type originalPublisherID: str
@param age: The relative age of the data (time in seconds since it was @param age: The relative age of the data (time in seconds since it was
originally published). Note that the original publish time originally published). Note that the original publish time
isn't actually given, to compensate for clock skew between isn't actually given, to compensate for clock skew between
@ -522,11 +527,8 @@ class Node(MockKademliaHelper):
@type age: int @type age: int
@rtype: str @rtype: str
@todo: Since the data (value) may be large, passing it around as a buffer
(which is the case currently) might not be a good idea... will have
to fix this (perhaps use a stream from the Protocol class?)
""" """
if originalPublisherID is None: if originalPublisherID is None:
originalPublisherID = rpc_contact.id originalPublisherID = rpc_contact.id
compact_ip = rpc_contact.compact_ip() compact_ip = rpc_contact.compact_ip()
@ -536,11 +538,11 @@ class Node(MockKademliaHelper):
compact_port = str(struct.pack('>H', port)) compact_port = str(struct.pack('>H', port))
else: else:
raise TypeError('Invalid port') raise TypeError('Invalid port')
compact_address = compact_ip + compact_port + rpc_contact.id compact_address = compact_ip + compact_port + rpc_contact.id
now = int(self.clock.seconds()) now = int(self.clock.seconds())
originallyPublished = now - age originallyPublished = now - age
self._dataStore.addPeerToBlob(blob_hash, compact_address, now, originallyPublished, originalPublisherID) self._dataStore.addPeerToBlob(rpc_contact, blob_hash, compact_address, now, originallyPublished,
originalPublisherID)
return 'OK' return 'OK'
@rpcmethod @rpcmethod
@ -658,6 +660,7 @@ class Node(MockKademliaHelper):
replication/republishing as necessary """ replication/republishing as necessary """
yield self._refreshRoutingTable() yield self._refreshRoutingTable()
self._dataStore.removeExpiredPeers() self._dataStore.removeExpiredPeers()
yield self._refreshStoringPeers()
defer.returnValue(None) defer.returnValue(None)
def _refreshContacts(self): def _refreshContacts(self):
@ -665,6 +668,13 @@ class Node(MockKademliaHelper):
[contact.ping() for contact in self.contacts], consumeErrors=True [contact.ping() for contact in self.contacts], consumeErrors=True
) )
def _refreshStoringPeers(self):
storing_contacts = self._dataStore.getStoringContacts()
return defer.DeferredList(
[self._protocol._ping_queue.enqueue_maybe_ping(contact) for contact in storing_contacts],
consumeErrors=True
)
@defer.inlineCallbacks @defer.inlineCallbacks
def _refreshRoutingTable(self): def _refreshRoutingTable(self):
nodeIDs = self._routingTable.getRefreshList(0, True) nodeIDs = self._routingTable.getRefreshList(0, True)

View file

@ -28,6 +28,10 @@ class TestStore(TestKademliaBase):
for node in storing_nodes: for node in storing_nodes:
self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash)) self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash) datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEquals(map(lambda contact: (contact.id, contact.address, contact.port),
node._dataStore.getStoringContacts()), [(announcing_node.node_id,
announcing_node.externalIP,
announcing_node.port)])
self.assertEquals(len(datastore_result), 1) self.assertEquals(len(datastore_result), 1)
expanded_peers = [] expanded_peers = []
for peer in datastore_result: for peer in datastore_result:
@ -47,10 +51,136 @@ class TestStore(TestKademliaBase):
datastore_result = node._dataStore.getPeersForBlob(blob_hash) datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEquals(len(datastore_result), 0) self.assertEquals(len(datastore_result), 0)
self.assertTrue(blob_hash in node._dataStore._dict) # the looping call shouldn't have removed it yet self.assertTrue(blob_hash in node._dataStore._dict) # the looping call shouldn't have removed it yet
self.assertEquals(len(node._dataStore.getStoringContacts()), 1)
self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh) self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh)
for node in storing_nodes: for node in storing_nodes:
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash)) self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash) datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEquals(len(datastore_result), 0) self.assertEquals(len(datastore_result), 0)
self.assertEquals(len(node._dataStore.getStoringContacts()), 0)
self.assertTrue(blob_hash not in node._dataStore._dict) # the looping call should have fired
@defer.inlineCallbacks
def test_refresh_storing_peers(self):
blob_hash = generate_id()
announcing_node = self.nodes[20]
# announce the blob
announce_d = announcing_node.announceHaveBlob(blob_hash)
self.pump_clock(5)
storing_node_ids = yield announce_d
all_nodes = set(self.nodes).union(set(self._seeds))
# verify the nodes we think stored it did actually store it
storing_nodes = [node for node in all_nodes if node.node_id.encode('hex') in storing_node_ids]
self.assertEquals(len(storing_nodes), len(storing_node_ids))
self.assertEquals(len(storing_nodes), constants.k)
for node in storing_nodes:
self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEquals(map(lambda contact: (contact.id, contact.address, contact.port),
node._dataStore.getStoringContacts()), [(announcing_node.node_id,
announcing_node.externalIP,
announcing_node.port)])
self.assertEquals(len(datastore_result), 1)
expanded_peers = []
for peer in datastore_result:
host = ".".join([str(ord(d)) for d in peer[:4]])
port, = struct.unpack('>H', peer[4:6])
peer_node_id = peer[6:]
if (host, port, peer_node_id) not in expanded_peers:
expanded_peers.append((peer_node_id, host, port))
self.assertEquals(expanded_peers[0],
(announcing_node.node_id, announcing_node.externalIP, announcing_node.peerPort))
self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh)
# verify the announced blob expires in the storing nodes datastores
self.clock.advance(constants.dataExpireTimeout) # skip the clock directly ahead
for node in storing_nodes:
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEquals(len(datastore_result), 0)
self.assertTrue(blob_hash in node._dataStore._dict) # the looping call shouldn't have removed it yet
self.assertEquals(len(node._dataStore.getStoringContacts()), 1)
self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh)
for node in storing_nodes:
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEquals(len(datastore_result), 0)
self.assertEquals(len(node._dataStore.getStoringContacts()), 0)
self.assertTrue(blob_hash not in node._dataStore._dict) # the looping call should have fired after self.assertTrue(blob_hash not in node._dataStore._dict) # the looping call should have fired after
class TestStoringNodeWentStale(TestKademliaBase):
network_size = 40
@defer.inlineCallbacks
def test_storing_node_went_stale_then_came_back(self):
blob_hash = generate_id()
announcing_node = self.nodes[20]
# announce the blob
announce_d = announcing_node.announceHaveBlob(blob_hash)
announce_time = self.clock.seconds()
self.pump_clock(5)
storing_node_ids = yield announce_d
all_nodes = set(self.nodes).union(set(self._seeds))
# verify the nodes we think stored it did actually store it
storing_nodes = [node for node in all_nodes if node.node_id.encode('hex') in storing_node_ids]
self.assertEquals(len(storing_nodes), len(storing_node_ids))
self.assertEquals(len(storing_nodes), constants.k)
for node in storing_nodes:
self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEquals(map(lambda contact: (contact.id, contact.address, contact.port),
node._dataStore.getStoringContacts()), [(announcing_node.node_id,
announcing_node.externalIP,
announcing_node.port)])
self.assertEquals(len(datastore_result), 1)
expanded_peers = []
for peer in datastore_result:
host = ".".join([str(ord(d)) for d in peer[:4]])
port, = struct.unpack('>H', peer[4:6])
peer_node_id = peer[6:]
if (host, port, peer_node_id) not in expanded_peers:
expanded_peers.append((peer_node_id, host, port))
self.assertEquals(expanded_peers[0],
(announcing_node.node_id, announcing_node.externalIP, announcing_node.peerPort))
self.nodes.remove(announcing_node)
yield self.run_reactor(1, [announcing_node.stop()])
# run the network for an hour, which should expire the removed node and the announced value
self.pump_clock(3600)
self.verify_all_nodes_are_routable()
self.verify_all_nodes_are_pingable()
for node in storing_nodes: # make sure the contact isn't returned as a peer for the blob, but that
# we still have the entry in the datastore in case the node returns
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEquals(len(datastore_result), 0)
self.assertEquals(len(node._dataStore.getStoringContacts()), 1)
self.assertTrue(blob_hash in node._dataStore._dict)
# bring the announcing node back online
self.nodes.append(announcing_node)
yield self.run_reactor(
31, [announcing_node.start([(seed_name, 4444) for seed_name in sorted(self.seed_dns.keys())])]
)
self.pump_clock(24*60+1) # FIXME: this should work after 12 minutes + 1 second, yet it doesnt
self.verify_all_nodes_are_routable()
self.verify_all_nodes_are_pingable()
# now the announcing node should once again be returned as a peer for the blob
for node in storing_nodes:
self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEquals(len(datastore_result), 1)
self.assertEquals(len(node._dataStore.getStoringContacts()), 1)
self.assertTrue(blob_hash in node._dataStore._dict)
# TODO: handle the case where the announcing node re joins with a different address from what is stored