forked from LBRYCommunity/lbry-sdk
refactor iterativeAnnounceHaveBlob
-change to only self_store if the number of contacts to store to is less than k and we are the closest node to the hash
This commit is contained in:
parent
14f9bb7b82
commit
ea0ea704a2
1 changed files with 44 additions and 65 deletions
|
@ -272,79 +272,58 @@ class Node(object):
|
||||||
def get_bandwidth_stats(self):
|
def get_bandwidth_stats(self):
|
||||||
return self._protocol.bandwidth_stats
|
return self._protocol.bandwidth_stats
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
def iterativeAnnounceHaveBlob(self, blob_hash, value):
|
def iterativeAnnounceHaveBlob(self, blob_hash, value):
|
||||||
known_nodes = {}
|
known_nodes = {}
|
||||||
|
contacts = yield self.iterativeFindNode(blob_hash)
|
||||||
@defer.inlineCallbacks
|
# store locally if we're the closest node and there are less than k contacts to try storing to
|
||||||
def announce_to_peer(responseTuple):
|
if self.externalIP is not None and contacts and len(contacts) < constants.k:
|
||||||
""" @type responseMsg: kademlia.msgtypes.ResponseMessage """
|
|
||||||
# The "raw response" tuple contains the response message,
|
|
||||||
# and the originating address info
|
|
||||||
responseMsg = responseTuple[0]
|
|
||||||
originAddress = responseTuple[1] # tuple: (ip adress, udp port)
|
|
||||||
# Make sure the responding node is valid, and abort the operation if it isn't
|
|
||||||
if not responseMsg.nodeID in known_nodes:
|
|
||||||
log.warning("Responding node was not expected")
|
|
||||||
defer.returnValue(responseMsg.nodeID)
|
|
||||||
remote_node = known_nodes[responseMsg.nodeID]
|
|
||||||
|
|
||||||
result = responseMsg.response
|
|
||||||
announced = False
|
|
||||||
if 'token' in result:
|
|
||||||
value['token'] = result['token']
|
|
||||||
try:
|
|
||||||
res = yield remote_node.store(blob_hash, value)
|
|
||||||
assert res == "OK", "unexpected response: {}".format(res)
|
|
||||||
announced = True
|
|
||||||
except protocol.TimeoutError:
|
|
||||||
log.info("Timeout while storing blob_hash %s at %s",
|
|
||||||
blob_hash.encode('hex')[:16], remote_node.id.encode('hex'))
|
|
||||||
except Exception as err:
|
|
||||||
log.error("Unexpected error while storing blob_hash %s at %s: %s",
|
|
||||||
blob_hash.encode('hex')[:16], remote_node.id.encode('hex'), err)
|
|
||||||
else:
|
|
||||||
log.warning("missing token")
|
|
||||||
defer.returnValue(announced)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def requestPeers(contacts):
|
|
||||||
if self.externalIP is not None and len(contacts) >= constants.k:
|
|
||||||
is_closer = Distance(blob_hash).is_closer(self.node_id, contacts[-1].id)
|
is_closer = Distance(blob_hash).is_closer(self.node_id, contacts[-1].id)
|
||||||
if is_closer:
|
if is_closer:
|
||||||
contacts.pop()
|
contacts.pop()
|
||||||
yield self.store(blob_hash, value, originalPublisherID=self.node_id,
|
yield self.store(blob_hash, value, originalPublisherID=self.node_id,
|
||||||
self_store=True)
|
self_store=True)
|
||||||
elif self.externalIP is not None:
|
elif self.externalIP is not None:
|
||||||
yield self.store(blob_hash, value, originalPublisherID=self.node_id,
|
pass
|
||||||
self_store=True)
|
|
||||||
else:
|
else:
|
||||||
raise Exception("Cannot determine external IP: %s" % self.externalIP)
|
raise Exception("Cannot determine external IP: %s" % self.externalIP)
|
||||||
|
|
||||||
contacted = []
|
contacted = []
|
||||||
for contact in contacts:
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def announce_to_contact(contact):
|
||||||
known_nodes[contact.id] = contact
|
known_nodes[contact.id] = contact
|
||||||
rpcMethod = getattr(contact, "findValue")
|
|
||||||
try:
|
try:
|
||||||
response = yield rpcMethod(blob_hash, rawResponse=True)
|
responseMsg, originAddress = yield contact.findValue(blob_hash, rawResponse=True)
|
||||||
stored = yield announce_to_peer(response)
|
if responseMsg.nodeID != contact.id:
|
||||||
if stored:
|
raise Exception("node id mismatch")
|
||||||
|
value['token'] = responseMsg.response['token']
|
||||||
|
res = yield contact.store(blob_hash, value)
|
||||||
|
if res != "OK":
|
||||||
|
raise ValueError(res)
|
||||||
contacted.append(contact)
|
contacted.append(contact)
|
||||||
|
log.debug("Stored %s to %s (%s)", blob_hash.encode('hex'), contact.id.encode('hex'), originAddress[0])
|
||||||
except protocol.TimeoutError:
|
except protocol.TimeoutError:
|
||||||
log.debug("Timeout while storing blob_hash %s at %s",
|
log.debug("Timeout while storing blob_hash %s at %s",
|
||||||
binascii.hexlify(blob_hash), contact)
|
blob_hash.encode('hex')[:16], contact.id.encode('hex'))
|
||||||
|
except ValueError as err:
|
||||||
|
log.error("Unexpected response: %s" % err.message)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
log.error("Unexpected error while storing blob_hash %s at %s: %s",
|
log.error("Unexpected error while storing blob_hash %s at %s: %s",
|
||||||
binascii.hexlify(blob_hash), contact, err)
|
binascii.hexlify(blob_hash), contact, err)
|
||||||
|
|
||||||
|
dl = []
|
||||||
|
for c in contacts:
|
||||||
|
dl.append(announce_to_contact(c))
|
||||||
|
|
||||||
|
yield defer.DeferredList(dl)
|
||||||
|
|
||||||
log.debug("Stored %s to %i of %i attempted peers", blob_hash.encode('hex')[:16],
|
log.debug("Stored %s to %i of %i attempted peers", blob_hash.encode('hex')[:16],
|
||||||
len(contacted), len(contacts))
|
len(contacted), len(contacts))
|
||||||
|
|
||||||
contacted_node_ids = [c.id.encode('hex') for c in contacts]
|
contacted_node_ids = [c.id.encode('hex') for c in contacted]
|
||||||
defer.returnValue(contacted_node_ids)
|
defer.returnValue(contacted_node_ids)
|
||||||
|
|
||||||
d = self.iterativeFindNode(blob_hash)
|
|
||||||
d.addCallback(requestPeers)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def change_token(self):
|
def change_token(self):
|
||||||
self.old_token_secret = self.token_secret
|
self.old_token_secret = self.token_secret
|
||||||
self.token_secret = self._generateID()
|
self.token_secret = self._generateID()
|
||||||
|
|
Loading…
Reference in a new issue