refactor announceHaveBlob
-add cached `token` to Contact objects to minimize findValue requests -remove self_store, always store to remote contacts even if we're the closest known node to the hash -move the store call and error handling from announceHaveBlob to a smaller function of its own
This commit is contained in:
parent
0386bfadf0
commit
42eb172638
2 changed files with 39 additions and 44 deletions
|
@ -35,6 +35,15 @@ class _Contact(object):
|
||||||
self.lastReplied = None
|
self.lastReplied = None
|
||||||
self.lastRequested = None
|
self.lastRequested = None
|
||||||
self.protocolVersion = constants.protocolVersion
|
self.protocolVersion = constants.protocolVersion
|
||||||
|
self._token = (None, 0) # token, timestamp
|
||||||
|
|
||||||
|
def update_token(self, token):
|
||||||
|
self._token = token, self.getTime()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def token(self):
|
||||||
|
# expire the token 1 minute early to be safe
|
||||||
|
return self._token[0] if self._token[1] + 240 > self.getTime() else None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def lastInteracted(self):
|
def lastInteracted(self):
|
||||||
|
|
|
@ -304,34 +304,17 @@ class Node(MockKademliaHelper):
|
||||||
return self._routingTable.bucketsWithContacts()
|
return self._routingTable.bucketsWithContacts()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def announceHaveBlob(self, blob_hash):
|
def storeToContact(self, blob_hash, contact):
|
||||||
known_nodes = {}
|
|
||||||
contacts = yield self.iterativeFindNode(blob_hash)
|
|
||||||
# store locally if we're the closest node and there are less than k contacts to try storing to
|
|
||||||
if self.externalIP is not None and contacts and len(contacts) < constants.k:
|
|
||||||
is_closer = Distance(blob_hash).is_closer(self.node_id, contacts[-1].id)
|
|
||||||
if is_closer:
|
|
||||||
contacts.pop()
|
|
||||||
self_contact = self.contact_manager.make_contact(self.node_id, self.externalIP,
|
|
||||||
self.port, self._protocol)
|
|
||||||
token = self.make_token(self_contact.compact_ip())
|
|
||||||
yield self.store(self_contact, blob_hash, token, self.peerPort, self.node_id, 0)
|
|
||||||
elif self.externalIP is not None:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
raise Exception("Cannot determine external IP: %s" % self.externalIP)
|
|
||||||
|
|
||||||
contacted = []
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def announce_to_contact(contact):
|
|
||||||
known_nodes[contact.id] = contact
|
|
||||||
try:
|
try:
|
||||||
response = yield contact.findValue(blob_hash)
|
token = contact.token
|
||||||
res = yield contact.store(blob_hash, response['token'], self.peerPort, self.node_id, 0)
|
if not token:
|
||||||
|
find_value_response = yield contact.findValue(blob_hash)
|
||||||
|
token = find_value_response['token']
|
||||||
|
contact.update_token(token)
|
||||||
|
res = yield contact.store(blob_hash, token, self.peerPort, self.node_id, 0)
|
||||||
if res != "OK":
|
if res != "OK":
|
||||||
raise ValueError(res)
|
raise ValueError(res)
|
||||||
contacted.append(contact)
|
defer.returnValue(True)
|
||||||
log.debug("Stored %s to %s (%s)", binascii.hexlify(blob_hash), contact.log_id(), contact.address)
|
log.debug("Stored %s to %s (%s)", binascii.hexlify(blob_hash), contact.log_id(), contact.address)
|
||||||
except protocol.TimeoutError:
|
except protocol.TimeoutError:
|
||||||
log.debug("Timeout while storing blob_hash %s at %s",
|
log.debug("Timeout while storing blob_hash %s at %s",
|
||||||
|
@ -341,17 +324,20 @@ class Node(MockKademliaHelper):
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
log.error("Unexpected error while storing blob_hash %s at %s: %s",
|
log.error("Unexpected error while storing blob_hash %s at %s: %s",
|
||||||
binascii.hexlify(blob_hash), contact, err)
|
binascii.hexlify(blob_hash), contact, err)
|
||||||
|
defer.returnValue(False)
|
||||||
|
|
||||||
dl = []
|
@defer.inlineCallbacks
|
||||||
for c in contacts:
|
def announceHaveBlob(self, blob_hash):
|
||||||
dl.append(announce_to_contact(c))
|
contacts = yield self.iterativeFindNode(blob_hash)
|
||||||
|
|
||||||
yield defer.DeferredList(dl)
|
|
||||||
|
|
||||||
|
if not self.externalIP:
|
||||||
|
raise Exception("Cannot determine external IP: %s" % self.externalIP)
|
||||||
|
stored_to = yield DeferredDict({contact: self.storeToContact(blob_hash, contact) for contact in contacts})
|
||||||
|
contacted_node_ids = map(
|
||||||
|
lambda contact: contact.id.encode('hex'), filter(lambda contact: stored_to[contact], stored_to.keys())
|
||||||
|
)
|
||||||
log.debug("Stored %s to %i of %i attempted peers", binascii.hexlify(blob_hash),
|
log.debug("Stored %s to %i of %i attempted peers", binascii.hexlify(blob_hash),
|
||||||
len(contacted), len(contacts))
|
len(contacted_node_ids), len(contacts))
|
||||||
|
|
||||||
contacted_node_ids = [c.id.encode('hex') for c in contacted]
|
|
||||||
defer.returnValue(contacted_node_ids)
|
defer.returnValue(contacted_node_ids)
|
||||||
|
|
||||||
def change_token(self):
|
def change_token(self):
|
||||||
|
|
Loading…
Reference in a new issue