diff --git a/lbrynet/dht/contact.py b/lbrynet/dht/contact.py index 0121afdc3..cd010d0c8 100644 --- a/lbrynet/dht/contact.py +++ b/lbrynet/dht/contact.py @@ -35,6 +35,15 @@ class _Contact(object): self.lastReplied = None self.lastRequested = None self.protocolVersion = constants.protocolVersion + self._token = (None, 0) # token, timestamp + + def update_token(self, token): + self._token = token, self.getTime() + + @property + def token(self): + # expire the token 1 minute early to be safe + return self._token[0] if self._token[1] + 240 > self.getTime() else None @property def lastInteracted(self): diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 200d31ade..18c8f8f41 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -303,55 +303,41 @@ class Node(MockKademliaHelper): def bucketsWithContacts(self): return self._routingTable.bucketsWithContacts() + @defer.inlineCallbacks + def storeToContact(self, blob_hash, contact): + try: + token = contact.token + if not token: + find_value_response = yield contact.findValue(blob_hash) + token = find_value_response['token'] + contact.update_token(token) + res = yield contact.store(blob_hash, token, self.peerPort, self.node_id, 0) + if res != "OK": + raise ValueError(res) + defer.returnValue(True) + log.debug("Stored %s to %s (%s)", binascii.hexlify(blob_hash), contact.log_id(), contact.address) + except protocol.TimeoutError: + log.debug("Timeout while storing blob_hash %s at %s", + binascii.hexlify(blob_hash), contact.log_id()) + except ValueError as err: + log.error("Unexpected response: %s" % err.message) + except Exception as err: + log.error("Unexpected error while storing blob_hash %s at %s: %s", + binascii.hexlify(blob_hash), contact, err) + defer.returnValue(False) + @defer.inlineCallbacks def announceHaveBlob(self, blob_hash): - known_nodes = {} contacts = yield self.iterativeFindNode(blob_hash) - # store locally if we're the closest node and there are less than k contacts to try storing to - if self.externalIP is not None and contacts and len(contacts) < constants.k: - is_closer = Distance(blob_hash).is_closer(self.node_id, contacts[-1].id) - if is_closer: - contacts.pop() - self_contact = self.contact_manager.make_contact(self.node_id, self.externalIP, - self.port, self._protocol) - token = self.make_token(self_contact.compact_ip()) - yield self.store(self_contact, blob_hash, token, self.peerPort, self.node_id, 0) - elif self.externalIP is not None: - pass - else: + + if not self.externalIP: raise Exception("Cannot determine external IP: %s" % self.externalIP) - - contacted = [] - - @defer.inlineCallbacks - def announce_to_contact(contact): - known_nodes[contact.id] = contact - try: - response = yield contact.findValue(blob_hash) - res = yield contact.store(blob_hash, response['token'], self.peerPort, self.node_id, 0) - if res != "OK": - raise ValueError(res) - contacted.append(contact) - log.debug("Stored %s to %s (%s)", binascii.hexlify(blob_hash), contact.log_id(), contact.address) - except protocol.TimeoutError: - log.debug("Timeout while storing blob_hash %s at %s", - binascii.hexlify(blob_hash), contact.log_id()) - except ValueError as err: - log.error("Unexpected response: %s" % err.message) - except Exception as err: - log.error("Unexpected error while storing blob_hash %s at %s: %s", - binascii.hexlify(blob_hash), contact, err) - - dl = [] - for c in contacts: - dl.append(announce_to_contact(c)) - - yield defer.DeferredList(dl) - + stored_to = yield DeferredDict({contact: self.storeToContact(blob_hash, contact) for contact in contacts}) + contacted_node_ids = map( + lambda contact: contact.id.encode('hex'), filter(lambda contact: stored_to[contact], stored_to.keys()) + ) log.debug("Stored %s to %i of %i attempted peers", binascii.hexlify(blob_hash), - len(contacted), len(contacts)) - - contacted_node_ids = [c.id.encode('hex') for c in contacted] + len(contacted_node_ids), len(contacts)) defer.returnValue(contacted_node_ids) def change_token(self):