diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 9c9ea8840..c9983f756 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -4949,7 +4949,6 @@ class Daemon(metaclass=JSONRPCServerType): DHT / Blob Exchange peer commands. """ - @requires(DHT_COMPONENT) async def jsonrpc_peer_list(self, blob_hash, page=None, page_size=None): """ Get peers for blob hash @@ -4977,8 +4976,11 @@ class Daemon(metaclass=JSONRPCServerType): tracker_peers = await tracker.get_kademlia_peer_list(bytes.fromhex(blob_hash)) log.info("Found %d peers for %s from trackers.", len(tracker_peers), blob_hash[:8]) peer_q.put_nowait(tracker_peers) + elif not self.component_manager.has_component(DHT_COMPONENT): + raise Exception("Peer list needs, at least, either a DHT component or a Tracker component for discovery.") peers = [] - await self.dht_node._peers_for_value_producer(blob_hash, peer_q) + if self.component_manager.has_component(DHT_COMPONENT): + await self.dht_node._peers_for_value_producer(blob_hash, peer_q) while not peer_q.empty(): peers.extend(peer_q.get_nowait()) results = { diff --git a/lbry/stream/downloader.py b/lbry/stream/downloader.py index c7d588267..0d9684443 100644 --- a/lbry/stream/downloader.py +++ b/lbry/stream/downloader.py @@ -67,8 +67,9 @@ class StreamDownloader: self.fixed_peers_handle = self.loop.call_later(self.fixed_peers_delay, _add_fixed_peers, fixed_peers) async def _process_announcement(self, announcement: 'AnnounceResponse'): - peers = [(str(ipaddress.ip_address(peer.address)), peer.port) for peer in announcement.peers] - peers = await get_kademlia_peers_from_hosts(peers) + peers = await get_kademlia_peers_from_hosts([ + (str(ipaddress.ip_address(peer.address)), peer.port) for peer in announcement.peers if peer.port > 1024 + ]) log.info("Found %d peers from tracker for %s", len(peers), self.sd_hash[:8]) self.peer_queue.put_nowait(peers) diff --git a/lbry/torrent/tracker.py b/lbry/torrent/tracker.py index 228353ef1..0eb64b793 100644 --- a/lbry/torrent/tracker.py +++ b/lbry/torrent/tracker.py @@ -187,10 +187,10 @@ class TrackerClient: errors = sum([1 for result in results if result is None or isinstance(result, Exception)]) log.info("Tracker: finished announcing %d files to %s:%d, %d errors", len(results), *server, errors) - async def get_peer_list(self, info_hash, stopped=False, on_announcement=None): + async def get_peer_list(self, info_hash, stopped=False, on_announcement=None, no_port=False): found = [] - servers = self._get_servers() - for done in asyncio.as_completed([self._probe_server(info_hash, *server, stopped) for server in servers]): + probes = [self._probe_server(info_hash, *server, stopped, no_port) for server in self._get_servers()] + for done in asyncio.as_completed(probes): result = await done if result is not None: await asyncio.gather(*filter(asyncio.iscoroutine, [on_announcement(result)] if on_announcement else [])) @@ -198,11 +198,14 @@ class TrackerClient: return found async def get_kademlia_peer_list(self, info_hash): - responses = await self.get_peer_list(info_hash) - peers = [(str(ipaddress.ip_address(peer.address)), peer.port) for ann in responses for peer in ann.peers] + responses = await self.get_peer_list(info_hash, no_port=True) + peers = [ + (str(ipaddress.ip_address(peer.address)), peer.port) + for ann in responses for peer in ann.peers if peer.port > 1024 # filter out privileged and 0 + ] return await get_kademlia_peers_from_hosts(peers) - async def _probe_server(self, info_hash, tracker_host, tracker_port, stopped=False): + async def _probe_server(self, info_hash, tracker_host, tracker_port, stopped=False, no_port=False): result = None try: tracker_host = await resolve_host(tracker_host, tracker_port, 'udp') @@ -216,7 +219,7 @@ class TrackerClient: return result try: result = await self.client.announce( - info_hash, self.peer_id, self.announce_port, tracker_host, tracker_port, stopped) + info_hash, self.peer_id, 0 if no_port else self.announce_port, tracker_host, tracker_port, stopped) self.results[tracker_host][info_hash] = (time.time() + result.interval, result) except asyncio.TimeoutError: # todo: this is UDP, timeout is common, we need a better metric for failures self.results[tracker_host][info_hash] = (time.time() + 60.0, result) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 970d54898..ffde6acc9 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -120,6 +120,14 @@ class FileCommands(CommandTestCase): await self.wait_files_to_complete() self.assertEqual(0, stream.blobs_remaining) self.assertEqual(2, len(server.peers[bytes.fromhex(sd_hash)[:20]])) + self.assertEqual([{'address': '127.0.0.1', + 'node_id': None, + 'tcp_port': 5567, + 'udp_port': None}, + {'address': '127.0.0.1', + 'node_id': None, + 'tcp_port': 4444, + 'udp_port': None}], (await self.daemon.jsonrpc_peer_list(sd_hash))['items']) async def test_announces(self): # announces on publish diff --git a/tests/unit/torrent/test_tracker.py b/tests/unit/torrent/test_tracker.py index b240ac60e..a411cddb0 100644 --- a/tests/unit/torrent/test_tracker.py +++ b/tests/unit/torrent/test_tracker.py @@ -9,7 +9,7 @@ class UDPTrackerClientTestCase(AsyncioTestCase): async def asyncSetUp(self): self.client_servers_list = [] self.servers = {} - self.client = TrackerClient(b"\x00" * 48, 4444, lambda: self.client_servers_list, timeout=0.1) + self.client = TrackerClient(b"\x00" * 48, 4444, lambda: self.client_servers_list, timeout=1) await self.client.start() self.addCleanup(self.client.stop) await self.add_server()