diff --git a/lbry/lbry/extras/daemon/analytics.py b/lbry/lbry/extras/daemon/analytics.py index 1b0c093e7..a0c3e8e5d 100644 --- a/lbry/lbry/extras/daemon/analytics.py +++ b/lbry/lbry/extras/daemon/analytics.py @@ -51,7 +51,7 @@ def _download_properties(conf: Config, external_ip: str, resolve_duration: float head_blob_hash: typing.Optional[str] = None, head_blob_length: typing.Optional[int] = None, head_blob_download_duration: typing.Optional[float] = None, - error: typing.Optional[str] = None, + error: typing.Optional[str] = None, error_msg: typing.Optional[str] = None, wallet_server: typing.Optional[str] = None) -> typing.Dict: return { "external_ip": external_ip, @@ -59,6 +59,7 @@ def _download_properties(conf: Config, external_ip: str, resolve_duration: float "total_duration": round(total_duration, 4), "resolve_duration": None if not resolve_duration else round(resolve_duration, 4), "error": error, + "error_message": error_msg, 'name': name, "outpoint": outpoint, @@ -187,11 +188,13 @@ class AnalyticsManager: head_blob_length: typing.Optional[int] = None, head_blob_duration: typing.Optional[int] = None, error: typing.Optional[str] = None, + error_msg: typing.Optional[str] = None, wallet_server: typing.Optional[str] = None): await self.track(self._event(TIME_TO_FIRST_BYTES, _download_properties( self.conf, self.external_ip, resolve_duration, total_duration, download_id, name, outpoint, found_peers_count, tried_peers_count, connection_failures_count, added_fixed_peers, fixed_peers_delay, - sd_hash, sd_download_duration, head_blob_hash, head_blob_length, head_blob_duration, error, wallet_server + sd_hash, sd_download_duration, head_blob_hash, head_blob_length, head_blob_duration, error, error_msg, + wallet_server ))) async def send_download_finished(self, download_id, name, sd_hash): diff --git a/lbry/lbry/stream/stream_manager.py b/lbry/lbry/stream/stream_manager.py index 97c2fc128..f15d8ee82 100644 --- a/lbry/lbry/stream/stream_manager.py +++ b/lbry/lbry/stream/stream_manager.py @@ -464,6 +464,7 @@ class StreamManager: None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length, None if not stream else stream.downloader.time_to_first_bytes, None if not error else error.__class__.__name__, + None if not error else str(error), None if not server else f"{server[0]}:{server[1]}" ) ) diff --git a/lbry/tests/unit/stream/test_stream_manager.py b/lbry/tests/unit/stream/test_stream_manager.py index be558af87..810cdc145 100644 --- a/lbry/tests/unit/stream/test_stream_manager.py +++ b/lbry/tests/unit/stream/test_stream_manager.py @@ -1,4 +1,5 @@ import os +import shutil import binascii from unittest import mock import asyncio @@ -77,6 +78,7 @@ def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None): mock_wallet = mock.Mock(spec=LbryWalletManager) mock_wallet.ledger.resolve = mock_resolve + mock_wallet.ledger.network.client.server = ('fakespv.lbry.com', 50001) async def get_balance(*_): return balance @@ -130,6 +132,7 @@ class TestStreamManager(BlobExchangeTestBase): head_blob_duration = event['properties']['head_blob_duration'] sd_blob_duration = event['properties']['sd_blob_duration'] self.assertFalse(event['properties']['added_fixed_peers']) + self.assertEqual(event['properties']['wallet_server'], "fakespv.lbry.com:50001") self.assertTrue(total_duration >= resolve_duration + head_blob_duration + sd_blob_duration) await self._test_time_to_first_bytes(check_post) @@ -172,6 +175,9 @@ class TestStreamManager(BlobExchangeTestBase): self.assertIsNone(event['properties']['sd_blob_duration']) self.assertFalse(event['properties']['added_fixed_peers']) self.assertEqual(event['properties']['connection_failures_count'], 1) + self.assertEqual( + event['properties']['error_message'], f'Failed to download sd blob {self.sd_hash} within timeout' + ) await self._test_time_to_first_bytes(check_post, DownloadSDTimeout, after_setup=after_setup) @@ -213,6 +219,9 @@ class TestStreamManager(BlobExchangeTestBase): self.assertEqual(event['properties']['use_fixed_peers'], False) self.assertEqual(event['properties']['added_fixed_peers'], False) self.assertEqual(event['properties']['fixed_peer_delay'], None) + self.assertEqual( + event['properties']['error_message'], f'Failed to download sd blob {self.sd_hash} within timeout' + ) start = self.loop.time() await self._test_time_to_first_bytes(check_post, DownloadSDTimeout) @@ -272,14 +281,21 @@ class TestStreamManager(BlobExchangeTestBase): self.assertListEqual(expected_events, received) async def _test_download_error_on_start(self, expected_error, timeout=None): - with self.assertRaises(expected_error): + error = None + try: await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager, timeout) + except Exception as err: + if isinstance(err, asyncio.CancelledError): + raise + error = err + self.assertEqual(expected_error, type(error)) - async def _test_download_error_analytics_on_start(self, expected_error, timeout=None): + async def _test_download_error_analytics_on_start(self, expected_error, error_message, timeout=None): received = [] async def check_post(event): self.assertEqual("Time To First Bytes", event['event']) + self.assertEqual(event['properties']['error_message'], error_message) received.append(event['properties']['error']) self.stream_manager.analytics_manager._post = check_post @@ -295,7 +311,7 @@ class TestStreamManager(BlobExchangeTestBase): 'version': '_0_0_1' } await self.setup_stream_manager(10.0, fee) - await self._test_download_error_on_start(InsufficientFundsError) + await self._test_download_error_on_start(InsufficientFundsError, "") async def test_fee_above_max_allowed(self): fee = { @@ -305,7 +321,7 @@ class TestStreamManager(BlobExchangeTestBase): 'version': '_0_0_1' } await self.setup_stream_manager(1000000.0, fee) - await self._test_download_error_on_start(KeyFeeAboveMaxAllowed) + await self._test_download_error_on_start(KeyFeeAboveMaxAllowed, "") async def test_resolve_error(self): await self.setup_stream_manager() @@ -315,14 +331,27 @@ class TestStreamManager(BlobExchangeTestBase): async def test_download_sd_timeout(self): self.server.stop_server() await self.setup_stream_manager() - await self._test_download_error_analytics_on_start(DownloadSDTimeout, timeout=1) + await self._test_download_error_analytics_on_start( + DownloadSDTimeout, f'Failed to download sd blob {self.sd_hash} within timeout', timeout=1 + ) async def test_download_data_timeout(self): await self.setup_stream_manager() with open(os.path.join(self.server_dir, self.sd_hash), 'r') as sdf: head_blob_hash = json.loads(sdf.read())['blobs'][0]['blob_hash'] self.server_blob_manager.delete_blob(head_blob_hash) - await self._test_download_error_analytics_on_start(DownloadDataTimeout, timeout=1) + await self._test_download_error_analytics_on_start( + DownloadDataTimeout, f'Failed to download data blobs for sd hash {self.sd_hash} within timeout', timeout=1 + ) + + async def test_unexpected_error(self): + await self.setup_stream_manager() + err_msg = f"invalid blob directory '{self.client_dir}'" + shutil.rmtree(self.client_dir) + await self._test_download_error_analytics_on_start( + OSError, err_msg, timeout=1 + ) + os.mkdir(self.client_dir) # so the test cleanup doesn't error async def test_non_head_data_timeout(self): await self.setup_stream_manager()