diff --git a/lbrynet/extras/daemon/analytics.py b/lbrynet/extras/daemon/analytics.py index cd33bd0a7..0d4c7de0a 100644 --- a/lbrynet/extras/daemon/analytics.py +++ b/lbrynet/extras/daemon/analytics.py @@ -45,7 +45,8 @@ def _event_properties(installation_id: str, session_id: str, def _download_properties(conf: Config, external_ip: str, resolve_duration: float, total_duration: typing.Optional[float], download_id: str, name: str, outpoint: str, active_peer_count: int, tried_peers_count: int, - sd_hash: str, sd_download_duration: typing.Optional[float] = None, + added_fixed_peers: bool, fixed_peer_delay: float, sd_hash: str, + sd_download_duration: typing.Optional[float] = None, head_blob_hash: typing.Optional[str] = None, head_blob_length: typing.Optional[int] = None, head_blob_download_duration: typing.Optional[float] = None, @@ -63,9 +64,8 @@ def _download_properties(conf: Config, external_ip: str, resolve_duration: float "peer_connect_timeout": conf.peer_connect_timeout, "blob_download_timeout": conf.blob_download_timeout, "use_fixed_peers": len(conf.reflector_servers) > 0, - "fixed_peer_delay": conf.fixed_peer_delay, - "added_fixed_peers": (conf.fixed_peer_delay < total_duration) and len(conf.reflector_servers) > 0, - + "fixed_peer_delay": fixed_peer_delay, + "added_fixed_peers": added_fixed_peers, "active_peer_count": active_peer_count, "tried_peers_count": tried_peers_count, @@ -173,7 +173,8 @@ class AnalyticsManager: async def send_time_to_first_bytes(self, resolve_duration: typing.Optional[float], total_duration: typing.Optional[float], download_id: str, name: str, outpoint: str, found_peers_count: int, - tried_peers_count: int, sd_hash: str, + tried_peers_count: int, added_fixed_peers: bool, + fixed_peers_delay: float, sd_hash: str, sd_download_duration: typing.Optional[float] = None, head_blob_hash: typing.Optional[str] = None, head_blob_length: typing.Optional[int] = None, @@ -181,8 +182,8 @@ class AnalyticsManager: error: typing.Optional[str] = None): await self.track(self._event(TIME_TO_FIRST_BYTES, _download_properties( self.conf, self.external_ip, resolve_duration, total_duration, download_id, name, outpoint, - found_peers_count, tried_peers_count, sd_hash, sd_download_duration, head_blob_hash, head_blob_length, - head_blob_duration, error + found_peers_count, tried_peers_count, added_fixed_peers, fixed_peers_delay, sd_hash, + sd_download_duration, head_blob_hash, head_blob_length, head_blob_duration, error ))) async def send_download_finished(self, download_id, name, sd_hash): diff --git a/lbrynet/stream/downloader.py b/lbrynet/stream/downloader.py index 70360e878..5aeead15c 100644 --- a/lbrynet/stream/downloader.py +++ b/lbrynet/stream/downloader.py @@ -36,6 +36,8 @@ class StreamDownloader(StreamAssembler): self.node: typing.Optional['Node'] = None self.assemble_task: typing.Optional[asyncio.Task] = None self.fixed_peers_handle: typing.Optional[asyncio.Handle] = None + self.fixed_peers_delay: typing.Optional[float] = None + self.added_fixed_peers = False async def setup(self): # start the peer accumulator and initialize the downloader if self.blob_downloader: @@ -82,17 +84,23 @@ class StreamDownloader(StreamAssembler): (await resolve_host(url, port + 1, proto='tcp'), port) for url, port in self.config.reflector_servers ] - delay = self.config.fixed_peer_delay if ( - 'dht' not in self.config.components_to_skip - and self.node and len(self.node.protocol.routing_table.get_peers()) - ) else 0.0 - self.loop.call_later(delay, lambda: + + def _delayed_add_fixed_peers(): + self.added_fixed_peers = True self.peer_queue.put_nowait([ KademliaPeer(self.loop, address=address, tcp_port=port + 1) for address, port in addresses - ])) - if self.config.reflector_servers: - self.loop.create_task(_add_fixed_peers()) + ]) + + self.fixed_peers_handle = self.loop.call_later(self.fixed_peers_delay, _delayed_add_fixed_peers) + if not self.config.reflector_servers: + return + if 'dht' in self.config.components_to_skip or not self.node or not \ + len(self.node.protocol.routing_table.get_peers()): + self.fixed_peers_delay = 0.0 + else: + self.fixed_peers_delay = self.config.fixed_peer_delay + self.loop.create_task(_add_fixed_peers()) def download(self, node: typing.Optional['Node'] = None): self.node = node diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 533924b86..0111bc5bb 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -482,6 +482,8 @@ class StreamManager: resolved_time, self.loop.time() - start_time, download_id, parse_lbry_uri(uri).name, outpoint, None if not stream else len(stream.downloader.blob_downloader.active_connections), None if not stream else len(stream.downloader.blob_downloader.scores), + False if not downloader else downloader.added_fixed_peers, + self.config.fixed_peer_delay if not downloader else downloader.fixed_peers_delay, claim.source_hash.decode(), time_to_descriptor, None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash, None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length, diff --git a/tests/unit/stream/test_stream_manager.py b/tests/unit/stream/test_stream_manager.py index 8b42e5a41..d96580898 100644 --- a/tests/unit/stream/test_stream_manager.py +++ b/tests/unit/stream/test_stream_manager.py @@ -14,6 +14,8 @@ from lbrynet.extras.daemon.analytics import AnalyticsManager from lbrynet.stream.stream_manager import StreamManager from lbrynet.stream.descriptor import StreamDescriptor from lbrynet.dht.node import Node +from lbrynet.dht.protocol.protocol import KademliaProtocol +from lbrynet.dht.protocol.routing_table import TreeRoutingTable from lbrynet.schema.claim import ClaimDict @@ -26,6 +28,9 @@ def get_mock_node(peer=None): return q2, asyncio.create_task(_task()) mock_node = mock.Mock(spec=Node) + mock_node.protocol = mock.Mock(spec=KademliaProtocol) + mock_node.protocol.routing_table = mock.Mock(spec=TreeRoutingTable) + mock_node.protocol.routing_table.get_peers = lambda: [] mock_node.accumulate_peers = mock_accumulate_peers mock_node.joined = asyncio.Event() mock_node.joined.set() @@ -113,51 +118,106 @@ class TestStreamManager(BlobExchangeTestBase): binascii.hexlify(generate_id()).decode())) self.exchange_rate_manager = get_dummy_exchange_rate_manager(time) - async def test_time_to_first_bytes(self): + async def _test_time_to_first_bytes(self, check_post, error=None, after_setup=None): await self.setup_stream_manager() - checked_post = False + if after_setup: + after_setup() + checked_analytics_event = False - async def check_post(event): + async def _check_post(event): + check_post(event) + nonlocal checked_analytics_event + checked_analytics_event = True + + self.stream_manager.analytics_manager._post = _check_post + if error: + with self.assertRaises(error): + await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) + else: + await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) + await asyncio.sleep(0, loop=self.loop) + self.assertTrue(checked_analytics_event) + + async def test_time_to_first_bytes(self): + def check_post(event): self.assertEqual(event['event'], 'Time To First Bytes') total_duration = event['properties']['total_duration'] resolve_duration = event['properties']['resolve_duration'] head_blob_duration = event['properties']['head_blob_duration'] sd_blob_duration = event['properties']['sd_blob_duration'] + self.assertFalse(event['properties']['added_fixed_peers']) self.assertTrue(total_duration >= resolve_duration + head_blob_duration + sd_blob_duration) - nonlocal checked_post - checked_post = True - self.stream_manager.analytics_manager._post = check_post - await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) - await asyncio.sleep(0, loop=self.loop) - self.assertTrue(checked_post) + await self._test_time_to_first_bytes(check_post) + + async def test_fixed_peer_delay_dht_peers_found(self): + self.client_config.reflector_servers = [(self.server_from_client.address, self.server_from_client.tcp_port - 1)] + server_from_client = None + self.server_from_client, server_from_client = server_from_client, self.server_from_client + + def after_setup(): + self.stream_manager.node.protocol.routing_table.get_peers = lambda: [server_from_client] + + def check_post(event): + self.assertEqual(event['event'], 'Time To First Bytes') + total_duration = event['properties']['total_duration'] + resolve_duration = event['properties']['resolve_duration'] + head_blob_duration = event['properties']['head_blob_duration'] + sd_blob_duration = event['properties']['sd_blob_duration'] + + self.assertEqual(event['event'], 'Time To First Bytes') + self.assertEqual(event['properties']['tried_peers_count'], 1) + self.assertEqual(event['properties']['active_peer_count'], 1) + self.assertEqual(event['properties']['use_fixed_peers'], True) + self.assertEqual(event['properties']['added_fixed_peers'], True) + self.assertEqual(event['properties']['fixed_peer_delay'], self.client_config.fixed_peer_delay) + self.assertGreaterEqual(total_duration, resolve_duration + head_blob_duration + sd_blob_duration) + + await self._test_time_to_first_bytes(check_post, after_setup=after_setup) + + async def test_override_fixed_peer_delay_dht_disabled(self): + self.client_config.reflector_servers = [(self.server_from_client.address, self.server_from_client.tcp_port - 1)] + self.client_config.components_to_skip = ['dht', 'hash_announcer'] + self.client_config.fixed_peer_delay = 9001.0 + self.server_from_client = None + + def check_post(event): + total_duration = event['properties']['total_duration'] + resolve_duration = event['properties']['resolve_duration'] + head_blob_duration = event['properties']['head_blob_duration'] + sd_blob_duration = event['properties']['sd_blob_duration'] + + self.assertEqual(event['event'], 'Time To First Bytes') + self.assertEqual(event['properties']['tried_peers_count'], 1) + self.assertEqual(event['properties']['active_peer_count'], 1) + self.assertEqual(event['properties']['use_fixed_peers'], True) + self.assertEqual(event['properties']['added_fixed_peers'], True) + self.assertEqual(event['properties']['fixed_peer_delay'], 0.0) + self.assertGreaterEqual(total_duration, resolve_duration + head_blob_duration + sd_blob_duration) + + start = self.loop.time() + await self._test_time_to_first_bytes(check_post) + self.assertTrue(self.loop.time() - start < 3) async def test_no_peers_timeout(self): + # FIXME: the download should ideally fail right away if there are no peers + # to initialize the shortlist and fixed peers are disabled self.server_from_client = None - self.client_config.fixed_peer_delay = 9001.0 self.client_config.download_timeout = 3.0 - await self.setup_stream_manager() - checked_post = False - async def check_post(event): + def check_post(event): self.assertEqual(event['event'], 'Time To First Bytes') self.assertEqual(event['properties']['error'], 'DownloadSDTimeout') self.assertEqual(event['properties']['tried_peers_count'], None) self.assertEqual(event['properties']['active_peer_count'], None) self.assertEqual(event['properties']['use_fixed_peers'], False) self.assertEqual(event['properties']['added_fixed_peers'], False) - self.assertEqual(event['properties']['fixed_peer_delay'], 9001) - nonlocal checked_post - checked_post = True + self.assertEqual(event['properties']['fixed_peer_delay'], None) - self.stream_manager.analytics_manager._post = check_post start = self.loop.time() - with self.assertRaises(DownloadSDTimeout): - await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) + await self._test_time_to_first_bytes(check_post, DownloadSDTimeout) duration = self.loop.time() - start - await asyncio.sleep(0, loop=self.loop) self.assertTrue(4.0 >= duration >= 3.0) - self.assertTrue(checked_post) async def test_download_stop_resume_delete(self): await self.setup_stream_manager()