diff --git a/lbrynet/dht/contact.py b/lbrynet/dht/contact.py index cba054e0d..2ee26c678 100644 --- a/lbrynet/dht/contact.py +++ b/lbrynet/dht/contact.py @@ -1,19 +1,78 @@ -class Contact(object): +from lbrynet.dht import constants + + +class _Contact(object): """ Encapsulation for remote contact This class contains information on a single remote contact, and also provides a direct RPC API to the remote node which it represents """ - def __init__(self, id, ipAddress, udpPort, networkProtocol, firstComm=0): - self.id = id + def __init__(self, contactManager, id, ipAddress, udpPort, networkProtocol, firstComm): + self._contactManager = contactManager + self._id = id + if id is not None: + if not len(id) == constants.key_bits / 8: + raise ValueError("invalid node id: %s", id.encode('hex')) self.address = ipAddress self.port = udpPort self._networkProtocol = networkProtocol self.commTime = firstComm + self.getTime = self._contactManager._get_time + self.lastReplied = None + self.lastRequested = None + + @property + def lastInteracted(self): + return max(self.lastRequested or 0, self.lastReplied or 0, self.lastFailed or 0) + + @property + def id(self): + return self._id + + def log_id(self, short=True): + if not self.id: + return "not initialized" + id_hex = self.id.encode('hex') + return id_hex if not short else id_hex[:8] + + @property + def failedRPCs(self): + return len(self.failures) + + @property + def lastFailed(self): + return self._contactManager._rpc_failures.get((self.address, self.port), [None])[-1] + + @property + def failures(self): + return self._contactManager._rpc_failures.get((self.address, self.port), []) + + @property + def contact_is_good(self): + """ + :return: False if contact is bad, None if contact is unknown, or True if contact is good + """ + failures = self.failures + now = self.getTime() + delay = constants.refreshTimeout / 4 + + if failures: + if self.lastReplied and len(failures) >= 2 and self.lastReplied < failures[-2]: + return False + elif self.lastReplied and len(failures) >= 2 and self.lastReplied > failures[-2]: + pass # handled below + elif len(failures) >= 2: + return False + + if self.lastReplied and self.lastReplied > now - delay: + return True + if self.lastReplied and self.lastRequested and self.lastRequested > now - delay: + return True + return None def __eq__(self, other): - if isinstance(other, Contact): + if isinstance(other, _Contact): return self.id == other.id elif isinstance(other, str): return self.id == other @@ -21,7 +80,7 @@ class Contact(object): return False def __ne__(self, other): - if isinstance(other, Contact): + if isinstance(other, _Contact): return self.id != other.id elif isinstance(other, str): return self.id != other @@ -33,6 +92,21 @@ class Contact(object): lambda buff, x: buff + bytearray([int(x)]), self.address.split('.'), bytearray()) return str(compact_ip) + def set_id(self, id): + if not self._id: + self._id = id + + def update_last_replied(self): + self.lastReplied = int(self.getTime()) + + def update_last_requested(self): + self.lastRequested = int(self.getTime()) + + def update_last_failed(self): + failures = self._contactManager._rpc_failures.get((self.address, self.port), []) + failures.append(self.getTime()) + self._contactManager._rpc_failures[(self.address, self.port)] = failures + def __str__(self): return '<%s.%s object; IP address: %s, UDP port: %d>' % ( self.__module__, self.__class__.__name__, self.address, self.port) @@ -56,3 +130,31 @@ class Contact(object): return self._networkProtocol.sendRPC(self, name, args, **kwargs) return _sendRPC + + +class ContactManager(object): + def __init__(self, get_time=None): + if not get_time: + from twisted.internet import reactor + get_time = reactor.seconds + self._get_time = get_time + self._contacts = {} + self._rpc_failures = {} + + def get_contact(self, id, address, port): + for contact in self._contacts.itervalues(): + if contact.id == id and contact.address == address and contact.port == port: + return contact + + def make_contact(self, id, ipAddress, udpPort, networkProtocol, firstComm=0): + ipAddress = str(ipAddress) + contact = self.get_contact(id, ipAddress, udpPort) + if contact: + return contact + contact = _Contact(self, id, ipAddress, udpPort, networkProtocol, firstComm or self._get_time()) + self._contacts[(id, ipAddress, udpPort)] = contact + return contact + + def is_ignored(self, origin_tuple): + failed_rpc_count = len(self._rpc_failures.get(origin_tuple, [])) + return failed_rpc_count > constants.rpcAttempts diff --git a/lbrynet/dht/error.py b/lbrynet/dht/error.py index 3111adf8f..3d44cf3f1 100644 --- a/lbrynet/dht/error.py +++ b/lbrynet/dht/error.py @@ -33,6 +33,9 @@ class TimeoutError(Exception): def __init__(self, remote_contact_id): # remote_contact_id is a binary blob so we need to convert it # into something more readable - msg = 'Timeout connecting to {}'.format(binascii.hexlify(remote_contact_id)) + if remote_contact_id: + msg = 'Timeout connecting to {}'.format(binascii.hexlify(remote_contact_id)) + else: + msg = 'Timeout connecting to uninitialized node' Exception.__init__(self, msg) self.remote_contact_id = remote_contact_id diff --git a/lbrynet/dht/kbucket.py b/lbrynet/dht/kbucket.py index de5484bb0..bb4cfc0dc 100644 --- a/lbrynet/dht/kbucket.py +++ b/lbrynet/dht/kbucket.py @@ -42,9 +42,19 @@ class KBucket(object): raise BucketFull("No space in bucket to insert contact") def getContact(self, contactID): - """ Get the contact specified node ID""" - index = self._contacts.index(contactID) - return self._contacts[index] + """Get the contact specified node ID + + @raise IndexError: raised if the contact is not in the bucket + + @param contactID: the node id of the contact to retrieve + @type contactID: str + + @rtype: dht.contact._Contact + """ + for contact in self._contacts: + if contact.id == contactID: + return contact + raise IndexError(contactID) def getContacts(self, count=-1, excludeContact=None): """ Returns a list containing up to the first count number of contacts @@ -92,14 +102,18 @@ class KBucket(object): if excludeContact in contactList: contactList.remove(excludeContact) + def getBadOrUnknownContacts(self): + contacts = self.getContacts(sort_distance_to=False) + results = [contact for contact in contacts if contact.contact_is_good is False] + results.extend(contact for contact in contacts if contact.contact_is_good is None) + return results return contactList def removeContact(self, contact): - """ Remove given contact from list + """ Remove the contact from the bucket - @param contact: The contact to remove, or a string containing the - contact's node ID - @type contact: kademlia.contact.Contact or str + @param contact: The contact to remove + @type contact: dht.contact._Contact @raise ValueError: The specified contact is not in this bucket """ @@ -124,3 +138,6 @@ class KBucket(object): def __len__(self): return len(self._contacts) + + def __contains__(self, item): + return item in self._contacts diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 31f1b238a..b24b923d1 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -24,7 +24,7 @@ import datastore import protocol from error import TimeoutError from peerfinder import DHTPeerFinder -from contact import Contact +from contact import ContactManager from distance import Distance @@ -51,6 +51,7 @@ class MockKademliaHelper(object): clock = clock or reactor self.clock = clock + self.contact_manager = ContactManager(self.clock.seconds) self.reactor_listenUDP = listenUDP self.reactor_resolve = resolve @@ -276,8 +277,10 @@ class Node(MockKademliaHelper): is_closer = Distance(blob_hash).is_closer(self.node_id, contacts[-1].id) if is_closer: contacts.pop() - yield self.store(blob_hash, value, originalPublisherID=self.node_id, - self_store=True) + self_contact = self.contact_manager.make_contact(self.node_id, self.externalIP, + self.port, self._protocol) + token = self.make_token(self_contact.compact_ip()) + yield self.store(self_contact, blob_hash, token, self.peerPort) elif self.externalIP is not None: pass else: @@ -403,17 +406,17 @@ class Node(MockKademliaHelper): @param contact: The contact to add to this node's k-buckets @type contact: kademlia.contact.Contact """ - self._routingTable.addContact(contact) + return self._routingTable.addContact(contact) - def removeContact(self, contactID): + def removeContact(self, contact): """ Remove the contact with the specified node ID from this node's table of known nodes. This is a simple wrapper for the same method in this object's RoutingTable object - @param contactID: The node ID of the contact to remove - @type contactID: str + @param contact: The Contact object to remove + @type contact: _Contact """ - self._routingTable.removeContact(contactID) + self._routingTable.removeContact(contact) def findContact(self, contactID): """ Find a entangled.kademlia.contact.Contact object for the specified @@ -430,10 +433,11 @@ class Node(MockKademliaHelper): contact = self._routingTable.getContact(contactID) df = defer.Deferred() df.callback(contact) - except ValueError: + except (ValueError, IndexError): def parseResults(nodes): + node_ids = [c.id for c in nodes] if contactID in nodes: - contact = nodes[nodes.index(contactID)] + contact = nodes[node_ids.index(contactID)] return contact else: return None @@ -451,11 +455,11 @@ class Node(MockKademliaHelper): return 'pong' @rpcmethod - def store(self, key, value, originalPublisherID=None, self_store=False, **kwargs): + def store(self, rpc_contact, blob_hash, token, port, originalPublisherID=None, age=0): """ Store the received data in this node's local hash table - @param key: The hashtable key of the data - @type key: str + @param blob_hash: The hashtable key of the data + @type blob_hash: str @param value: The actual data (the value associated with C{key}) @type value: str @param originalPublisherID: The node ID of the node that is the @@ -473,54 +477,24 @@ class Node(MockKademliaHelper): (which is the case currently) might not be a good idea... will have to fix this (perhaps use a stream from the Protocol class?) """ - # Get the sender's ID (if any) if originalPublisherID is None: - if '_rpcNodeID' in kwargs: - originalPublisherID = kwargs['_rpcNodeID'] - else: - raise TypeError, 'No NodeID given. Therefore we can\'t store this node' - - if self_store is True and self.externalIP: - contact = Contact(self.node_id, self.externalIP, self.port, None, None) - compact_ip = contact.compact_ip() - elif '_rpcNodeContact' in kwargs: - contact = kwargs['_rpcNodeContact'] - compact_ip = contact.compact_ip() + originalPublisherID = rpc_contact.id + compact_ip = rpc_contact.compact_ip() + if not self.verify_token(token, compact_ip): + raise ValueError("Invalid token") + if 0 <= port <= 65536: + compact_port = str(struct.pack('>H', port)) else: - raise TypeError, 'No contact info available' - - if not self_store: - if 'token' not in value: - raise ValueError("Missing token") - if not self.verify_token(value['token'], compact_ip): - raise ValueError("Invalid token") - - if 'port' in value: - port = int(value['port']) - if 0 <= port <= 65536: - compact_port = str(struct.pack('>H', port)) - else: - raise TypeError('Invalid port') - else: - raise TypeError('No port available') - - if 'lbryid' in value: - if len(value['lbryid']) != constants.key_bits / 8: - raise ValueError('Invalid lbryid (%i bytes): %s' % (len(value['lbryid']), - value['lbryid'].encode('hex'))) - else: - compact_address = compact_ip + compact_port + value['lbryid'] - else: - raise TypeError('No lbryid given') + raise TypeError('Invalid port') + compact_address = compact_ip + compact_port + rpc_contact.id now = int(time.time()) - originallyPublished = now # - age - self._dataStore.addPeerToBlob(key, compact_address, now, originallyPublished, - originalPublisherID) + originallyPublished = now - age + self._dataStore.addPeerToBlob(blob_hash, compact_address, now, originallyPublished, originalPublisherID) return 'OK' @rpcmethod - def findNode(self, key, **kwargs): + def findNode(self, rpc_contact, key): """ Finds a number of known nodes closest to the node/value with the specified key. @@ -533,20 +507,17 @@ class Node(MockKademliaHelper): node is returning all of the contacts that it knows of. @rtype: list """ + if len(key) != constants.key_bits / 8: + raise ValueError("invalid contact id length: %i" % len(key)) - # Get the sender's ID (if any) - if '_rpcNodeID' in kwargs: - rpc_sender_id = kwargs['_rpcNodeID'] - else: - rpc_sender_id = None - contacts = self._routingTable.findCloseNodes(key, constants.k, rpc_sender_id) + contacts = self._routingTable.findCloseNodes(key, constants.k, rpc_contact.id) contact_triples = [] for contact in contacts: contact_triples.append((contact.id, contact.address, contact.port)) return contact_triples @rpcmethod - def findValue(self, key, **kwargs): + def findValue(self, rpc_contact, key): """ Return the value associated with the specified key if present in this node's data, otherwise execute FIND_NODE for the key @@ -558,16 +529,18 @@ class Node(MockKademliaHelper): @rtype: dict or list """ + if len(key) != constants.key_bits / 8: + raise ValueError("invalid blob hash length: %i" % len(key)) + + response = { + 'token': self.make_token(rpc_contact.compact_ip()), + } + if self._dataStore.hasPeersForBlob(key): - rval = {key: self._dataStore.getPeersForBlob(key)} + response[key] = self._dataStore.getPeersForBlob(key) else: - contact_triples = self.findNode(key, **kwargs) - rval = {'contacts': contact_triples} - if '_rpcNodeContact' in kwargs: - contact = kwargs['_rpcNodeContact'] - compact_ip = contact.compact_ip() - rval['token'] = self.make_token(compact_ip) - return rval + response['contacts'] = self.findNode(rpc_contact, key) + return response def _generateID(self): """ Generates an n-bit pseudo-random identifier @@ -606,13 +579,15 @@ class Node(MockKademliaHelper): return a list of the k closest nodes to the specified key @rtype: twisted.internet.defer.Deferred """ - findValue = rpc != 'findNode' + + if len(key) != constants.key_bits / 8: + raise ValueError("invalid key length: %i" % len(key)) if startupShortlist is None: shortlist = self._routingTable.findCloseNodes(key, constants.k) - if key != self.node_id: - # Update the "last accessed" timestamp for the appropriate k-bucket - self._routingTable.touchKBucket(key) + # if key != self.node_id: + # # Update the "last accessed" timestamp for the appropriate k-bucket + # self._routingTable.touchKBucket(key) if len(shortlist) == 0: log.warning("This node doesnt know any other nodes") # This node doesn't know of any other nodes @@ -621,7 +596,7 @@ class Node(MockKademliaHelper): result = yield fakeDf defer.returnValue(result) else: - # This is used during the bootstrap process; node ID's are most probably fake + # This is used during the bootstrap process shortlist = startupShortlist outerDf = defer.Deferred() diff --git a/lbrynet/dht/protocol.py b/lbrynet/dht/protocol.py index e1ca25d15..43a7a16f8 100644 --- a/lbrynet/dht/protocol.py +++ b/lbrynet/dht/protocol.py @@ -9,7 +9,6 @@ import constants import encoding import msgtypes import msgformat -from contact import Contact from error import BUILTIN_EXCEPTIONS, UnknownRemoteException, TimeoutError log = logging.getLogger(__name__) @@ -29,7 +28,8 @@ class KademliaProtocol(protocol.DatagramProtocol): self._partialMessagesProgress = {} def sendRPC(self, contact, method, args, rawResponse=False): - """ Sends an RPC to the specified contact + """ + Sends an RPC to the specified contact @param contact: The contact (remote node) to send the RPC to @type contact: kademlia.contacts.Contact @@ -60,19 +60,39 @@ class KademliaProtocol(protocol.DatagramProtocol): encodedMsg = self._encoder.encode(msgPrimitive) if args: - log.debug("DHT SEND CALL %s(%s)", method, args[0].encode('hex')) + log.debug("%s:%i SEND CALL %s(%s) TO %s:%i", self._node.externalIP, self._node.port, method, + args[0].encode('hex'), contact.address, contact.port) else: - log.debug("DHT SEND CALL %s", method) + log.debug("%s:%i SEND CALL %s TO %s:%i", self._node.externalIP, self._node.port, method, + contact.address, contact.port) df = defer.Deferred() if rawResponse: df._rpcRawResponse = True + def _remove_contact(failure): # remove the contact from the routing table and track the failure + try: + self._node.removeContact(contact) + except (ValueError, IndexError): + pass + contact.update_last_failed() + return failure + + def _update_contact(result): # refresh the contact in the routing table + contact.update_last_replied() + d = self._node.addContact(contact) + d.addCallback(lambda _: result) + return d + + df.addCallbacks(_update_contact, _remove_contact) + # Set the RPC timeout timer timeoutCall, cancelTimeout = self._node.reactor_callLater(constants.rpcTimeout, self._msgTimeout, msg.id) + # Transmit the data self._send(encodedMsg, msg.id, (contact.address, contact.port)) - self._sentMessages[msg.id] = (contact.id, df, timeoutCall, method, args) + self._sentMessages[msg.id] = (contact, df, timeoutCall, cancelTimeout, method, args) + df.addErrback(cancelTimeout) return df @@ -115,46 +135,80 @@ class KademliaProtocol(protocol.DatagramProtocol): log.warning("Couldn't decode dht datagram from %s", address) return - remoteContact = Contact(message.nodeID, address[0], address[1], self) - - # Refresh the remote node's details in the local node's k-buckets - self._node.addContact(remoteContact) if isinstance(message, msgtypes.RequestMessage): # This is an RPC method request - self._handleRPC(remoteContact, message.id, message.request, message.args) + remoteContact = self._node.contact_manager.make_contact(message.nodeID, address[0], address[1], self) + remoteContact.update_last_requested() + # only add a requesting contact to the routing table if it has replied to one of our requests + if remoteContact.contact_is_good is True: + df = self._node.addContact(remoteContact) + else: + df = defer.succeed(None) + df.addCallback(lambda _: self._handleRPC(remoteContact, message.id, message.request, message.args)) + # if the contact is not known to be bad (yet) and we haven't yet queried it, send it a ping so that it + # will be added to our routing table if successful + if remoteContact.contact_is_good is None and remoteContact.lastReplied is None: + df.addCallback(lambda _: self._ping_queue.enqueue_maybe_ping(remoteContact)) + elif isinstance(message, msgtypes.ErrorMessage): + # The RPC request raised a remote exception; raise it locally + if message.exceptionType in BUILTIN_EXCEPTIONS: + exception_type = BUILTIN_EXCEPTIONS[message.exceptionType] + else: + exception_type = UnknownRemoteException + remoteException = exception_type(message.response) + log.error("DHT RECV REMOTE EXCEPTION FROM %s:%i: %s", address[0], + address[1], remoteException) + if message.id in self._sentMessages: + # Cancel timeout timer for this RPC + remoteContact, df, timeoutCall, timeoutCanceller, method = self._sentMessages[message.id][0:5] + timeoutCanceller() + del self._sentMessages[message.id] + # reject replies coming from a different address than what we sent our request to + if (remoteContact.address, remoteContact.port) != address: + log.warning("Sent request to node %s at %s:%i, got reply from %s:%i", + remoteContact.log_id(), remoteContact.address, + remoteContact.port, address[0], address[1]) + df.errback(TimeoutError(remoteContact.id)) + return + + # this error is returned by nodes that can be contacted but have an old + # and broken version of the ping command, if they return it the node can + # be contacted, so we'll treat it as a successful ping + old_ping_error = "ping() got an unexpected keyword argument '_rpcNodeContact'" + if isinstance(remoteException, TypeError) and \ + remoteException.message == old_ping_error: + log.debug("old pong error") + df.callback('pong') + else: + df.errback(remoteException) elif isinstance(message, msgtypes.ResponseMessage): # Find the message that triggered this response if message.id in self._sentMessages: # Cancel timeout timer for this RPC - df, timeoutCall = self._sentMessages[message.id][1:3] - timeoutCall.cancel() + remoteContact, df, timeoutCall, timeoutCanceller, method = self._sentMessages[message.id][0:5] + timeoutCanceller() del self._sentMessages[message.id] + log.debug("%s:%i RECV response to %s from %s:%i", self._node.externalIP, self._node.port, + method, remoteContact.address, remoteContact.port) + + # When joining the network we made Contact objects for the seed nodes with node ids set to None + # Thus, the sent_to_id will also be None, and the contact objects need the ids to be manually set. + # These replies have be distinguished from those where the node id in the datagram does not match + # the node id of the node we sent a message to (these messages are treated as an error) + if remoteContact.id and remoteContact.id != message.nodeID: # sent_to_id will be None for bootstrap + log.debug("mismatch: (%s) %s:%i (%s vs %s)", method, remoteContact.address, remoteContact.port, + remoteContact.log_id(False), message.nodeID.encode('hex')) + df.errback(TimeoutError(remoteContact.id)) + return + elif not remoteContact.id: + remoteContact.set_id(message.nodeID) if hasattr(df, '_rpcRawResponse'): # The RPC requested that the raw response message # and originating address be returned; do not # interpret it df.callback((message, address)) - elif isinstance(message, msgtypes.ErrorMessage): - # The RPC request raised a remote exception; raise it locally - if message.exceptionType in BUILTIN_EXCEPTIONS: - exception_type = BUILTIN_EXCEPTIONS[message.exceptionType] - else: - exception_type = UnknownRemoteException - remoteException = exception_type(message.response) - # this error is returned by nodes that can be contacted but have an old - # and broken version of the ping command, if they return it the node can - # be contacted, so we'll treat it as a successful ping - old_ping_error = "ping() got an unexpected keyword argument '_rpcNodeContact'" - if isinstance(remoteException, TypeError) and \ - remoteException.message == old_ping_error: - log.debug("old pong error") - df.callback('pong') - else: - log.error("DHT RECV REMOTE EXCEPTION FROM %s:%i: %s", address[0], - address[1], remoteException) - df.errback(remoteException) else: # We got a result from the RPC df.callback(message.response) @@ -259,28 +313,29 @@ class KademliaProtocol(protocol.DatagramProtocol): # Execute the RPC func = getattr(self._node, method, None) - if callable(func) and hasattr(func, 'rpcmethod'): + if callable(func) and hasattr(func, "rpcmethod"): # Call the exposed Node method and return the result to the deferred callback chain if args: - log.debug("DHT RECV CALL %s(%s) %s:%i", method, args[0].encode('hex'), - senderContact.address, senderContact.port) + log.debug("%s:%i RECV CALL %s(%s) %s:%i", self._node.externalIP, self._node.port, method, + args[0].encode('hex'), senderContact.address, senderContact.port) else: - log.debug("DHT RECV CALL %s %s:%i", method, senderContact.address, - senderContact.port) + log.debug("%s:%i RECV CALL %s %s:%i", self._node.externalIP, self._node.port, method, + senderContact.address, senderContact.port) try: if method != 'ping': - kwargs = {'_rpcNodeID': senderContact.id, '_rpcNodeContact': senderContact} - result = func(*args, **kwargs) + result = func(senderContact, *args) else: result = func() except Exception, e: - log.exception("error handling request for %s: %s", senderContact.address, method) + log.exception("error handling request for %s:%i %s", senderContact.address, + senderContact.port, method) df.errback(e) else: df.callback(result) else: # No such exposed method df.errback(AttributeError('Invalid method: %s' % method)) + return df def _msgTimeout(self, messageID): """ Called when an RPC request message times out """ @@ -289,30 +344,30 @@ class KademliaProtocol(protocol.DatagramProtocol): # This should never be reached log.error("deferred timed out, but is not present in sent messages list!") return - remoteContactID, df, timeout_call, method, args = self._sentMessages[messageID] + remoteContact, df, timeout_call, timeout_canceller, method, args = self._sentMessages[messageID] if self._partialMessages.has_key(messageID): # We are still receiving this message - self._msgTimeoutInProgress(messageID, remoteContactID, df, method, args) + self._msgTimeoutInProgress(messageID, timeout_canceller, remoteContact, df, method, args) return del self._sentMessages[messageID] # The message's destination node is now considered to be dead; # raise an (asynchronous) TimeoutError exception and update the host node - self._node.removeContact(remoteContactID) - df.errback(TimeoutError(remoteContactID)) + df.errback(TimeoutError(remoteContact.id)) - def _msgTimeoutInProgress(self, messageID, remoteContactID, df, method, args): + def _msgTimeoutInProgress(self, messageID, timeoutCanceller, remoteContact, df, method, args): # See if any progress has been made; if not, kill the message if self._hasProgressBeenMade(messageID): # Reset the RPC timeout timer - timeoutCall, _ = self._node.reactor_callLater(constants.rpcTimeout, self._msgTimeout, messageID) - self._sentMessages[messageID] = (remoteContactID, df, timeoutCall, method, args) + timeoutCanceller() + timeoutCall, cancelTimeout = self._node.reactor_callLater(constants.rpcTimeout, self._msgTimeout, messageID) + self._sentMessages[messageID] = (remoteContact, df, timeoutCall, cancelTimeout, method, args) else: # No progress has been made if messageID in self._partialMessagesProgress: del self._partialMessagesProgress[messageID] if messageID in self._partialMessages: del self._partialMessages[messageID] - df.errback(TimeoutError(remoteContactID)) + df.errback(TimeoutError(remoteContact.id)) def _hasProgressBeenMade(self, messageID): return ( diff --git a/lbrynet/dht/routingtable.py b/lbrynet/dht/routingtable.py index 16e3ef1cb..0b20fa621 100644 --- a/lbrynet/dht/routingtable.py +++ b/lbrynet/dht/routingtable.py @@ -202,16 +202,16 @@ class TreeRoutingTable(object): bucketIndex += 1 return refreshIDs - def removeContact(self, contactID): - """ Remove the contact with the specified node ID from the routing - table - - @param contactID: The node ID of the contact to remove - @type contactID: str + def removeContact(self, contact): """ - bucketIndex = self._kbucketIndex(contactID) + Remove the contact from the routing table + + @param contact: The contact to remove + @type contact: dht.contact._Contact + """ + bucketIndex = self._kbucketIndex(contact.id) try: - self._buckets[bucketIndex].removeContact(contactID) + self._buckets[bucketIndex].removeContact(contact) except ValueError: return