forked from LBRYCommunity/lbry-sdk
Merge branch '1446'
This commit is contained in:
commit
673d259d7a
4 changed files with 35 additions and 9 deletions
|
@ -40,7 +40,7 @@ class _Contact:
|
||||||
self._token = (None, 0) # token, timestamp
|
self._token = (None, 0) # token, timestamp
|
||||||
|
|
||||||
def update_token(self, token):
|
def update_token(self, token):
|
||||||
self._token = token, self.getTime()
|
self._token = token, self.getTime() if token else 0
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def token(self):
|
def token(self):
|
||||||
|
|
|
@ -299,25 +299,24 @@ class Node(MockKademliaHelper):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def storeToContact(self, blob_hash, contact):
|
def storeToContact(self, blob_hash, contact):
|
||||||
try:
|
try:
|
||||||
token = contact.token
|
if not contact.token:
|
||||||
if not token:
|
yield contact.findValue(blob_hash)
|
||||||
find_value_response = yield contact.findValue(blob_hash)
|
res = yield contact.store(blob_hash, contact.token, self.peerPort, self.node_id, 0)
|
||||||
token = find_value_response[b'token']
|
|
||||||
contact.update_token(token)
|
|
||||||
res = yield contact.store(blob_hash, token, self.peerPort, self.node_id, 0)
|
|
||||||
if res != b"OK":
|
if res != b"OK":
|
||||||
raise ValueError(res)
|
raise ValueError(res)
|
||||||
defer.returnValue(True)
|
|
||||||
log.debug("Stored %s to %s (%s)", binascii.hexlify(blob_hash), contact.log_id(), contact.address)
|
log.debug("Stored %s to %s (%s)", binascii.hexlify(blob_hash), contact.log_id(), contact.address)
|
||||||
|
return True
|
||||||
except protocol.TimeoutError:
|
except protocol.TimeoutError:
|
||||||
log.debug("Timeout while storing blob_hash %s at %s",
|
log.debug("Timeout while storing blob_hash %s at %s",
|
||||||
binascii.hexlify(blob_hash), contact.log_id())
|
binascii.hexlify(blob_hash), contact.log_id())
|
||||||
except ValueError as err:
|
except ValueError as err:
|
||||||
log.error("Unexpected response: %s" % err)
|
log.error("Unexpected response: %s" % err)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
|
if 'Invalid token' in str(err):
|
||||||
|
contact.update_token(None)
|
||||||
log.error("Unexpected error while storing blob_hash %s at %s: %s",
|
log.error("Unexpected error while storing blob_hash %s at %s: %s",
|
||||||
binascii.hexlify(blob_hash), contact, err)
|
binascii.hexlify(blob_hash), contact, err)
|
||||||
defer.returnValue(False)
|
return False
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def announceHaveBlob(self, blob_hash):
|
def announceHaveBlob(self, blob_hash):
|
||||||
|
|
|
@ -184,6 +184,8 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
def _update_contact(result): # refresh the contact in the routing table
|
def _update_contact(result): # refresh the contact in the routing table
|
||||||
contact.update_last_replied()
|
contact.update_last_replied()
|
||||||
if method == b'findValue':
|
if method == b'findValue':
|
||||||
|
if b'token' in result:
|
||||||
|
contact.update_token(result[b'token'])
|
||||||
if b'protocolVersion' not in result:
|
if b'protocolVersion' not in result:
|
||||||
contact.update_protocol_version(0)
|
contact.update_protocol_version(0)
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -13,6 +13,31 @@ log = logging.getLogger()
|
||||||
class TestStoreExpiration(TestKademliaBase):
|
class TestStoreExpiration(TestKademliaBase):
|
||||||
network_size = 40
|
network_size = 40
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_nullify_token(self):
|
||||||
|
blob_hash = generate_id(1)
|
||||||
|
announcing_node = self.nodes[20]
|
||||||
|
# announce the blob
|
||||||
|
announce_d = announcing_node.announceHaveBlob(blob_hash)
|
||||||
|
self.pump_clock(5+1)
|
||||||
|
storing_node_ids = yield announce_d
|
||||||
|
self.assertEqual(len(storing_node_ids), 8)
|
||||||
|
|
||||||
|
for node in set(self.nodes).union(set(self._seeds)):
|
||||||
|
# now, everyone has the wrong token
|
||||||
|
node.change_token()
|
||||||
|
node.change_token()
|
||||||
|
|
||||||
|
announce_d = announcing_node.announceHaveBlob(blob_hash)
|
||||||
|
self.pump_clock(5+1)
|
||||||
|
storing_node_ids = yield announce_d
|
||||||
|
self.assertEqual(len(storing_node_ids), 0) # cant store, wrong tokens, but they get nullified
|
||||||
|
|
||||||
|
announce_d = announcing_node.announceHaveBlob(blob_hash)
|
||||||
|
self.pump_clock(5+1)
|
||||||
|
storing_node_ids = yield announce_d
|
||||||
|
self.assertEqual(len(storing_node_ids), 8) # next attempt succeeds as it refreshes tokens
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_store_and_expire(self):
|
def test_store_and_expire(self):
|
||||||
blob_hash = generate_id(1)
|
blob_hash = generate_id(1)
|
||||||
|
|
Loading…
Reference in a new issue