forked from LBRYCommunity/lbry-sdk
simplify announceHaveBlob, remove unused getPeersForBlob
This commit is contained in:
parent
372fb45e06
commit
aee7a3aa38
1 changed files with 30 additions and 37 deletions
|
@ -293,34 +293,9 @@ class Node(MockKademliaHelper):
|
|||
|
||||
def bucketsWithContacts(self):
|
||||
return self._routingTable.bucketsWithContacts()
|
||||
def announceHaveBlob(self, key):
|
||||
return self.iterativeAnnounceHaveBlob(
|
||||
key, {
|
||||
'port': self.peerPort,
|
||||
'lbryid': self.node_id,
|
||||
}
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def getPeersForBlob(self, blob_hash, include_node_ids=False):
|
||||
result = yield self.iterativeFindValue(blob_hash)
|
||||
expanded_peers = []
|
||||
if result:
|
||||
if blob_hash in result:
|
||||
for peer in result[blob_hash]:
|
||||
host = ".".join([str(ord(d)) for d in peer[:4]])
|
||||
port, = struct.unpack('>H', peer[4:6])
|
||||
if not include_node_ids:
|
||||
if (host, port) not in expanded_peers:
|
||||
expanded_peers.append((host, port))
|
||||
else:
|
||||
peer_node_id = peer[6:].encode('hex')
|
||||
if (host, port, peer_node_id) not in expanded_peers:
|
||||
expanded_peers.append((host, port, peer_node_id))
|
||||
defer.returnValue(expanded_peers)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def iterativeAnnounceHaveBlob(self, blob_hash, value):
|
||||
def announceHaveBlob(self, blob_hash):
|
||||
known_nodes = {}
|
||||
contacts = yield self.iterativeFindNode(blob_hash)
|
||||
# store locally if we're the closest node and there are less than k contacts to try storing to
|
||||
|
@ -344,17 +319,14 @@ class Node(MockKademliaHelper):
|
|||
known_nodes[contact.id] = contact
|
||||
try:
|
||||
responseMsg, originAddress = yield contact.findValue(blob_hash, rawResponse=True)
|
||||
if responseMsg.nodeID != contact.id:
|
||||
raise Exception("node id mismatch")
|
||||
value['token'] = responseMsg.response['token']
|
||||
res = yield contact.store(blob_hash, value)
|
||||
res = yield contact.store(blob_hash, responseMsg.response['token'], self.peerPort)
|
||||
if res != "OK":
|
||||
raise ValueError(res)
|
||||
contacted.append(contact)
|
||||
log.debug("Stored %s to %s (%s)", blob_hash.encode('hex'), contact.id.encode('hex'), originAddress[0])
|
||||
except protocol.TimeoutError:
|
||||
log.debug("Timeout while storing blob_hash %s at %s",
|
||||
blob_hash.encode('hex')[:16], contact.id.encode('hex'))
|
||||
blob_hash.encode('hex')[:16], contact.log_id())
|
||||
except ValueError as err:
|
||||
log.error("Unexpected response: %s" % err.message)
|
||||
except Exception as err:
|
||||
|
@ -430,12 +402,15 @@ class Node(MockKademliaHelper):
|
|||
@rtype: twisted.internet.defer.Deferred
|
||||
"""
|
||||
|
||||
if len(key) != constants.key_bits / 8:
|
||||
raise ValueError("invalid key length!")
|
||||
|
||||
# Execute the search
|
||||
iterative_find_result = yield self._iterativeFind(key, rpc='findValue')
|
||||
if isinstance(iterative_find_result, dict):
|
||||
find_result = yield self._iterativeFind(key, rpc='findValue')
|
||||
if isinstance(find_result, dict):
|
||||
# We have found the value; now see who was the closest contact without it...
|
||||
# ...and store the key/value pair
|
||||
defer.returnValue(iterative_find_result)
|
||||
pass
|
||||
else:
|
||||
# The value wasn't found, but a list of contacts was returned
|
||||
# Now, see if we have the value (it might seem wasteful to search on the network
|
||||
|
@ -445,10 +420,28 @@ class Node(MockKademliaHelper):
|
|||
# Ok, we have the value locally, so use that
|
||||
# Send this value to the closest node without it
|
||||
peers = self._dataStore.getPeersForBlob(key)
|
||||
defer.returnValue({key: peers})
|
||||
find_result = {key: peers}
|
||||
else:
|
||||
# Ok, value does not exist in DHT at all
|
||||
defer.returnValue(iterative_find_result)
|
||||
pass
|
||||
|
||||
expanded_peers = []
|
||||
if find_result:
|
||||
if key in find_result:
|
||||
for peer in find_result[key]:
|
||||
host = ".".join([str(ord(d)) for d in peer[:4]])
|
||||
port, = struct.unpack('>H', peer[4:6])
|
||||
peer_node_id = peer[6:]
|
||||
if (host, port, peer_node_id) not in expanded_peers:
|
||||
expanded_peers.append((peer_node_id, host, port))
|
||||
# TODO: get this working
|
||||
# if 'closestNodeNoValue' in find_result:
|
||||
# closest_node_without_value = find_result['closestNodeNoValue']
|
||||
# try:
|
||||
# response, address = yield closest_node_without_value.findValue(key, rawResponse=True)
|
||||
# yield closest_node_without_value.store(key, response.response['token'], self.peerPort)
|
||||
# except TimeoutError:
|
||||
# pass
|
||||
defer.returnValue(expanded_peers)
|
||||
|
||||
def addContact(self, contact):
|
||||
""" Add/update the given contact; simple wrapper for the same method
|
||||
|
|
Loading…
Reference in a new issue