diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index c7dbdf119..0adc4f7b6 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -272,78 +272,57 @@ class Node(object): def get_bandwidth_stats(self): return self._protocol.bandwidth_stats + @defer.inlineCallbacks def iterativeAnnounceHaveBlob(self, blob_hash, value): known_nodes = {} - - @defer.inlineCallbacks - def announce_to_peer(responseTuple): - """ @type responseMsg: kademlia.msgtypes.ResponseMessage """ - # The "raw response" tuple contains the response message, - # and the originating address info - responseMsg = responseTuple[0] - originAddress = responseTuple[1] # tuple: (ip adress, udp port) - # Make sure the responding node is valid, and abort the operation if it isn't - if not responseMsg.nodeID in known_nodes: - log.warning("Responding node was not expected") - defer.returnValue(responseMsg.nodeID) - remote_node = known_nodes[responseMsg.nodeID] - - result = responseMsg.response - announced = False - if 'token' in result: - value['token'] = result['token'] - try: - res = yield remote_node.store(blob_hash, value) - assert res == "OK", "unexpected response: {}".format(res) - announced = True - except protocol.TimeoutError: - log.info("Timeout while storing blob_hash %s at %s", - blob_hash.encode('hex')[:16], remote_node.id.encode('hex')) - except Exception as err: - log.error("Unexpected error while storing blob_hash %s at %s: %s", - blob_hash.encode('hex')[:16], remote_node.id.encode('hex'), err) - else: - log.warning("missing token") - defer.returnValue(announced) - - @defer.inlineCallbacks - def requestPeers(contacts): - if self.externalIP is not None and len(contacts) >= constants.k: - is_closer = Distance(blob_hash).is_closer(self.node_id, contacts[-1].id) - if is_closer: - contacts.pop() - yield self.store(blob_hash, value, originalPublisherID=self.node_id, - self_store=True) - elif self.externalIP is not None: + contacts = yield self.iterativeFindNode(blob_hash) + # store locally if we're the closest node and there are less than k contacts to try storing to + if self.externalIP is not None and contacts and len(contacts) < constants.k: + is_closer = Distance(blob_hash).is_closer(self.node_id, contacts[-1].id) + if is_closer: + contacts.pop() yield self.store(blob_hash, value, originalPublisherID=self.node_id, self_store=True) - else: - raise Exception("Cannot determine external IP: %s" % self.externalIP) + elif self.externalIP is not None: + pass + else: + raise Exception("Cannot determine external IP: %s" % self.externalIP) - contacted = [] - for contact in contacts: - known_nodes[contact.id] = contact - rpcMethod = getattr(contact, "findValue") - try: - response = yield rpcMethod(blob_hash, rawResponse=True) - stored = yield announce_to_peer(response) - if stored: - contacted.append(contact) - except protocol.TimeoutError: - log.debug("Timeout while storing blob_hash %s at %s", - binascii.hexlify(blob_hash), contact) - except Exception as err: - log.error("Unexpected error while storing blob_hash %s at %s: %s", - binascii.hexlify(blob_hash), contact, err) - log.debug("Stored %s to %i of %i attempted peers", blob_hash.encode('hex')[:16], - len(contacted), len(contacts)) + contacted = [] - contacted_node_ids = [c.id.encode('hex') for c in contacts] - defer.returnValue(contacted_node_ids) + @defer.inlineCallbacks + def announce_to_contact(contact): + known_nodes[contact.id] = contact + try: + responseMsg, originAddress = yield contact.findValue(blob_hash, rawResponse=True) + if responseMsg.nodeID != contact.id: + raise Exception("node id mismatch") + value['token'] = responseMsg.response['token'] + res = yield contact.store(blob_hash, value) + if res != "OK": + raise ValueError(res) + contacted.append(contact) + log.debug("Stored %s to %s (%s)", blob_hash.encode('hex'), contact.id.encode('hex'), originAddress[0]) + except protocol.TimeoutError: + log.debug("Timeout while storing blob_hash %s at %s", + blob_hash.encode('hex')[:16], contact.id.encode('hex')) + except ValueError as err: + log.error("Unexpected response: %s" % err.message) + except Exception as err: + log.error("Unexpected error while storing blob_hash %s at %s: %s", + binascii.hexlify(blob_hash), contact, err) - d = self.iterativeFindNode(blob_hash) - d.addCallback(requestPeers) - return d + dl = [] + for c in contacts: + dl.append(announce_to_contact(c)) + + yield defer.DeferredList(dl) + + log.debug("Stored %s to %i of %i attempted peers", blob_hash.encode('hex')[:16], + len(contacted), len(contacts)) + + contacted_node_ids = [c.id.encode('hex') for c in contacted] + defer.returnValue(contacted_node_ids) def change_token(self): self.old_token_secret = self.token_secret