fix announce loop when there are no peers to announce

This commit is contained in:
Victor Shyba 2019-05-12 00:42:19 -03:00
parent b7d76fd09f
commit f02df86709
3 changed files with 27 additions and 23 deletions

View file

@ -74,24 +74,25 @@ class Node:
await fut await fut
async def announce_blob(self, blob_hash: str) -> typing.List[bytes]: async def announce_blob(self, blob_hash: str) -> typing.List[bytes]:
announced_to_node_ids = [] hash_value = binascii.unhexlify(blob_hash.encode())
while not announced_to_node_ids: assert len(hash_value) == constants.hash_length
hash_value = binascii.unhexlify(blob_hash.encode()) peers = await self.peer_search(hash_value)
assert len(hash_value) == constants.hash_length
peers = await self.peer_search(hash_value)
if not self.protocol.external_ip: if not self.protocol.external_ip:
raise Exception("Cannot determine external IP") raise Exception("Cannot determine external IP")
log.debug("Store to %i peers", len(peers)) log.debug("Store to %i peers", len(peers))
for peer in peers: for peer in peers:
log.debug("store to %s %s %s", peer.address, peer.udp_port, peer.tcp_port) log.debug("store to %s %s %s", peer.address, peer.udp_port, peer.tcp_port)
stored_to_tup = await asyncio.gather( stored_to_tup = await asyncio.gather(
*(self.protocol.store_to_peer(hash_value, peer) for peer in peers), loop=self.loop *(self.protocol.store_to_peer(hash_value, peer) for peer in peers), loop=self.loop
) )
announced_to_node_ids.extend([node_id for node_id, contacted in stored_to_tup if contacted]) stored_to = [node_id for node_id, contacted in stored_to_tup if contacted]
if stored_to:
log.info("Stored %s to %i of %i attempted peers", binascii.hexlify(hash_value).decode()[:8], log.info("Stored %s to %i of %i attempted peers", binascii.hexlify(hash_value).decode()[:8],
len(announced_to_node_ids), len(peers)) len(stored_to), len(peers))
return announced_to_node_ids else:
log.warning("Failed announcing %s, stored to 0 peers")
return stored_to
def stop(self) -> None: def stop(self) -> None:
if self.joined.is_set(): if self.joined.is_set():

View file

@ -276,8 +276,7 @@ class IterativeNodeFinder(IterativeFinder):
not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id)) not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id))
to_yield = not_yet_yielded[:min(constants.k, len(not_yet_yielded))] to_yield = not_yet_yielded[:min(constants.k, len(not_yet_yielded))]
if to_yield: if to_yield:
for peer in to_yield: self.yielded_peers.update(to_yield)
self.yielded_peers.add(peer)
self.iteration_queue.put_nowait(to_yield) self.iteration_queue.put_nowait(to_yield)
if finish: if finish:
self.iteration_queue.put_nowait(None) self.iteration_queue.put_nowait(None)

View file

@ -1,4 +1,5 @@
import asyncio import asyncio
from binascii import hexlify
from lbrynet.dht import constants from lbrynet.dht import constants
from lbrynet.dht.node import Node from lbrynet.dht.node import Node
@ -6,7 +7,7 @@ from lbrynet.dht.peer import PeerManager, KademliaPeer
from torba.testcase import AsyncioTestCase from torba.testcase import AsyncioTestCase
class CLIIntegrationTest(AsyncioTestCase): class DHTIntegrationTest(AsyncioTestCase):
async def asyncSetUp(self): async def asyncSetUp(self):
import logging import logging
@ -24,16 +25,13 @@ class CLIIntegrationTest(AsyncioTestCase):
self.nodes.append(node) self.nodes.append(node)
self.known_node_addresses.append(('127.0.0.1', node_port)) self.known_node_addresses.append(('127.0.0.1', node_port))
await node.start_listening('127.0.0.1') await node.start_listening('127.0.0.1')
self.addCleanup(node.stop)
for node in self.nodes: for node in self.nodes:
node.protocol.rpc_timeout = .2 node.protocol.rpc_timeout = .2
node.protocol.ping_queue._default_delay = .5 node.protocol.ping_queue._default_delay = .5
node.start('127.0.0.1', self.known_node_addresses[:1]) node.start('127.0.0.1', self.known_node_addresses[:1])
await asyncio.gather(*[node.joined.wait() for node in self.nodes]) await asyncio.gather(*[node.joined.wait() for node in self.nodes])
async def asyncTearDown(self):
for node in self.nodes:
node.stop()
async def test_replace_bad_nodes(self): async def test_replace_bad_nodes(self):
await self.setup_network(20) await self.setup_network(20)
self.assertEquals(len(self.nodes), 20) self.assertEquals(len(self.nodes), 20)
@ -55,3 +53,9 @@ class CLIIntegrationTest(AsyncioTestCase):
self.assertIn(peer.node_id, good_nodes) self.assertIn(peer.node_id, good_nodes)
async def test_announce_no_peers(self):
await self.setup_network(1)
node = self.nodes[0]
blob_hash = hexlify(constants.generate_id(1337)).decode()
peers = await node.announce_blob(blob_hash)
self.assertEqual(len(peers), 0)