update tests, query with port 0, filter bad ports earlier, make unit tests more reliable
This commit is contained in:
parent
3dc145fe68
commit
7cba51ca7d
5 changed files with 26 additions and 12 deletions
|
@ -4949,7 +4949,6 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
DHT / Blob Exchange peer commands.
|
DHT / Blob Exchange peer commands.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@requires(DHT_COMPONENT)
|
|
||||||
async def jsonrpc_peer_list(self, blob_hash, page=None, page_size=None):
|
async def jsonrpc_peer_list(self, blob_hash, page=None, page_size=None):
|
||||||
"""
|
"""
|
||||||
Get peers for blob hash
|
Get peers for blob hash
|
||||||
|
@ -4977,8 +4976,11 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
tracker_peers = await tracker.get_kademlia_peer_list(bytes.fromhex(blob_hash))
|
tracker_peers = await tracker.get_kademlia_peer_list(bytes.fromhex(blob_hash))
|
||||||
log.info("Found %d peers for %s from trackers.", len(tracker_peers), blob_hash[:8])
|
log.info("Found %d peers for %s from trackers.", len(tracker_peers), blob_hash[:8])
|
||||||
peer_q.put_nowait(tracker_peers)
|
peer_q.put_nowait(tracker_peers)
|
||||||
|
elif not self.component_manager.has_component(DHT_COMPONENT):
|
||||||
|
raise Exception("Peer list needs, at least, either a DHT component or a Tracker component for discovery.")
|
||||||
peers = []
|
peers = []
|
||||||
await self.dht_node._peers_for_value_producer(blob_hash, peer_q)
|
if self.component_manager.has_component(DHT_COMPONENT):
|
||||||
|
await self.dht_node._peers_for_value_producer(blob_hash, peer_q)
|
||||||
while not peer_q.empty():
|
while not peer_q.empty():
|
||||||
peers.extend(peer_q.get_nowait())
|
peers.extend(peer_q.get_nowait())
|
||||||
results = {
|
results = {
|
||||||
|
|
|
@ -67,8 +67,9 @@ class StreamDownloader:
|
||||||
self.fixed_peers_handle = self.loop.call_later(self.fixed_peers_delay, _add_fixed_peers, fixed_peers)
|
self.fixed_peers_handle = self.loop.call_later(self.fixed_peers_delay, _add_fixed_peers, fixed_peers)
|
||||||
|
|
||||||
async def _process_announcement(self, announcement: 'AnnounceResponse'):
|
async def _process_announcement(self, announcement: 'AnnounceResponse'):
|
||||||
peers = [(str(ipaddress.ip_address(peer.address)), peer.port) for peer in announcement.peers]
|
peers = await get_kademlia_peers_from_hosts([
|
||||||
peers = await get_kademlia_peers_from_hosts(peers)
|
(str(ipaddress.ip_address(peer.address)), peer.port) for peer in announcement.peers if peer.port > 1024
|
||||||
|
])
|
||||||
log.info("Found %d peers from tracker for %s", len(peers), self.sd_hash[:8])
|
log.info("Found %d peers from tracker for %s", len(peers), self.sd_hash[:8])
|
||||||
self.peer_queue.put_nowait(peers)
|
self.peer_queue.put_nowait(peers)
|
||||||
|
|
||||||
|
|
|
@ -187,10 +187,10 @@ class TrackerClient:
|
||||||
errors = sum([1 for result in results if result is None or isinstance(result, Exception)])
|
errors = sum([1 for result in results if result is None or isinstance(result, Exception)])
|
||||||
log.info("Tracker: finished announcing %d files to %s:%d, %d errors", len(results), *server, errors)
|
log.info("Tracker: finished announcing %d files to %s:%d, %d errors", len(results), *server, errors)
|
||||||
|
|
||||||
async def get_peer_list(self, info_hash, stopped=False, on_announcement=None):
|
async def get_peer_list(self, info_hash, stopped=False, on_announcement=None, no_port=False):
|
||||||
found = []
|
found = []
|
||||||
servers = self._get_servers()
|
probes = [self._probe_server(info_hash, *server, stopped, no_port) for server in self._get_servers()]
|
||||||
for done in asyncio.as_completed([self._probe_server(info_hash, *server, stopped) for server in servers]):
|
for done in asyncio.as_completed(probes):
|
||||||
result = await done
|
result = await done
|
||||||
if result is not None:
|
if result is not None:
|
||||||
await asyncio.gather(*filter(asyncio.iscoroutine, [on_announcement(result)] if on_announcement else []))
|
await asyncio.gather(*filter(asyncio.iscoroutine, [on_announcement(result)] if on_announcement else []))
|
||||||
|
@ -198,11 +198,14 @@ class TrackerClient:
|
||||||
return found
|
return found
|
||||||
|
|
||||||
async def get_kademlia_peer_list(self, info_hash):
|
async def get_kademlia_peer_list(self, info_hash):
|
||||||
responses = await self.get_peer_list(info_hash)
|
responses = await self.get_peer_list(info_hash, no_port=True)
|
||||||
peers = [(str(ipaddress.ip_address(peer.address)), peer.port) for ann in responses for peer in ann.peers]
|
peers = [
|
||||||
|
(str(ipaddress.ip_address(peer.address)), peer.port)
|
||||||
|
for ann in responses for peer in ann.peers if peer.port > 1024 # filter out privileged and 0
|
||||||
|
]
|
||||||
return await get_kademlia_peers_from_hosts(peers)
|
return await get_kademlia_peers_from_hosts(peers)
|
||||||
|
|
||||||
async def _probe_server(self, info_hash, tracker_host, tracker_port, stopped=False):
|
async def _probe_server(self, info_hash, tracker_host, tracker_port, stopped=False, no_port=False):
|
||||||
result = None
|
result = None
|
||||||
try:
|
try:
|
||||||
tracker_host = await resolve_host(tracker_host, tracker_port, 'udp')
|
tracker_host = await resolve_host(tracker_host, tracker_port, 'udp')
|
||||||
|
@ -216,7 +219,7 @@ class TrackerClient:
|
||||||
return result
|
return result
|
||||||
try:
|
try:
|
||||||
result = await self.client.announce(
|
result = await self.client.announce(
|
||||||
info_hash, self.peer_id, self.announce_port, tracker_host, tracker_port, stopped)
|
info_hash, self.peer_id, 0 if no_port else self.announce_port, tracker_host, tracker_port, stopped)
|
||||||
self.results[tracker_host][info_hash] = (time.time() + result.interval, result)
|
self.results[tracker_host][info_hash] = (time.time() + result.interval, result)
|
||||||
except asyncio.TimeoutError: # todo: this is UDP, timeout is common, we need a better metric for failures
|
except asyncio.TimeoutError: # todo: this is UDP, timeout is common, we need a better metric for failures
|
||||||
self.results[tracker_host][info_hash] = (time.time() + 60.0, result)
|
self.results[tracker_host][info_hash] = (time.time() + 60.0, result)
|
||||||
|
|
|
@ -120,6 +120,14 @@ class FileCommands(CommandTestCase):
|
||||||
await self.wait_files_to_complete()
|
await self.wait_files_to_complete()
|
||||||
self.assertEqual(0, stream.blobs_remaining)
|
self.assertEqual(0, stream.blobs_remaining)
|
||||||
self.assertEqual(2, len(server.peers[bytes.fromhex(sd_hash)[:20]]))
|
self.assertEqual(2, len(server.peers[bytes.fromhex(sd_hash)[:20]]))
|
||||||
|
self.assertEqual([{'address': '127.0.0.1',
|
||||||
|
'node_id': None,
|
||||||
|
'tcp_port': 5567,
|
||||||
|
'udp_port': None},
|
||||||
|
{'address': '127.0.0.1',
|
||||||
|
'node_id': None,
|
||||||
|
'tcp_port': 4444,
|
||||||
|
'udp_port': None}], (await self.daemon.jsonrpc_peer_list(sd_hash))['items'])
|
||||||
|
|
||||||
async def test_announces(self):
|
async def test_announces(self):
|
||||||
# announces on publish
|
# announces on publish
|
||||||
|
|
|
@ -9,7 +9,7 @@ class UDPTrackerClientTestCase(AsyncioTestCase):
|
||||||
async def asyncSetUp(self):
|
async def asyncSetUp(self):
|
||||||
self.client_servers_list = []
|
self.client_servers_list = []
|
||||||
self.servers = {}
|
self.servers = {}
|
||||||
self.client = TrackerClient(b"\x00" * 48, 4444, lambda: self.client_servers_list, timeout=0.1)
|
self.client = TrackerClient(b"\x00" * 48, 4444, lambda: self.client_servers_list, timeout=1)
|
||||||
await self.client.start()
|
await self.client.start()
|
||||||
self.addCleanup(self.client.stop)
|
self.addCleanup(self.client.stop)
|
||||||
await self.add_server()
|
await self.add_server()
|
||||||
|
|
Loading…
Reference in a new issue