forked from LBRYCommunity/lbry-sdk
add error_message to ttfb analytics, update tests
This commit is contained in:
parent
fba1b28615
commit
b8b42a1fd2
3 changed files with 41 additions and 8 deletions
|
@ -51,7 +51,7 @@ def _download_properties(conf: Config, external_ip: str, resolve_duration: float
|
||||||
head_blob_hash: typing.Optional[str] = None,
|
head_blob_hash: typing.Optional[str] = None,
|
||||||
head_blob_length: typing.Optional[int] = None,
|
head_blob_length: typing.Optional[int] = None,
|
||||||
head_blob_download_duration: typing.Optional[float] = None,
|
head_blob_download_duration: typing.Optional[float] = None,
|
||||||
error: typing.Optional[str] = None,
|
error: typing.Optional[str] = None, error_msg: typing.Optional[str] = None,
|
||||||
wallet_server: typing.Optional[str] = None) -> typing.Dict:
|
wallet_server: typing.Optional[str] = None) -> typing.Dict:
|
||||||
return {
|
return {
|
||||||
"external_ip": external_ip,
|
"external_ip": external_ip,
|
||||||
|
@ -59,6 +59,7 @@ def _download_properties(conf: Config, external_ip: str, resolve_duration: float
|
||||||
"total_duration": round(total_duration, 4),
|
"total_duration": round(total_duration, 4),
|
||||||
"resolve_duration": None if not resolve_duration else round(resolve_duration, 4),
|
"resolve_duration": None if not resolve_duration else round(resolve_duration, 4),
|
||||||
"error": error,
|
"error": error,
|
||||||
|
"error_message": error_msg,
|
||||||
'name': name,
|
'name': name,
|
||||||
"outpoint": outpoint,
|
"outpoint": outpoint,
|
||||||
|
|
||||||
|
@ -187,11 +188,13 @@ class AnalyticsManager:
|
||||||
head_blob_length: typing.Optional[int] = None,
|
head_blob_length: typing.Optional[int] = None,
|
||||||
head_blob_duration: typing.Optional[int] = None,
|
head_blob_duration: typing.Optional[int] = None,
|
||||||
error: typing.Optional[str] = None,
|
error: typing.Optional[str] = None,
|
||||||
|
error_msg: typing.Optional[str] = None,
|
||||||
wallet_server: typing.Optional[str] = None):
|
wallet_server: typing.Optional[str] = None):
|
||||||
await self.track(self._event(TIME_TO_FIRST_BYTES, _download_properties(
|
await self.track(self._event(TIME_TO_FIRST_BYTES, _download_properties(
|
||||||
self.conf, self.external_ip, resolve_duration, total_duration, download_id, name, outpoint,
|
self.conf, self.external_ip, resolve_duration, total_duration, download_id, name, outpoint,
|
||||||
found_peers_count, tried_peers_count, connection_failures_count, added_fixed_peers, fixed_peers_delay,
|
found_peers_count, tried_peers_count, connection_failures_count, added_fixed_peers, fixed_peers_delay,
|
||||||
sd_hash, sd_download_duration, head_blob_hash, head_blob_length, head_blob_duration, error, wallet_server
|
sd_hash, sd_download_duration, head_blob_hash, head_blob_length, head_blob_duration, error, error_msg,
|
||||||
|
wallet_server
|
||||||
)))
|
)))
|
||||||
|
|
||||||
async def send_download_finished(self, download_id, name, sd_hash):
|
async def send_download_finished(self, download_id, name, sd_hash):
|
||||||
|
|
|
@ -464,6 +464,7 @@ class StreamManager:
|
||||||
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length,
|
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length,
|
||||||
None if not stream else stream.downloader.time_to_first_bytes,
|
None if not stream else stream.downloader.time_to_first_bytes,
|
||||||
None if not error else error.__class__.__name__,
|
None if not error else error.__class__.__name__,
|
||||||
|
None if not error else str(error),
|
||||||
None if not server else f"{server[0]}:{server[1]}"
|
None if not server else f"{server[0]}:{server[1]}"
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
import os
|
import os
|
||||||
|
import shutil
|
||||||
import binascii
|
import binascii
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
import asyncio
|
import asyncio
|
||||||
|
@ -77,6 +78,7 @@ def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None):
|
||||||
|
|
||||||
mock_wallet = mock.Mock(spec=LbryWalletManager)
|
mock_wallet = mock.Mock(spec=LbryWalletManager)
|
||||||
mock_wallet.ledger.resolve = mock_resolve
|
mock_wallet.ledger.resolve = mock_resolve
|
||||||
|
mock_wallet.ledger.network.client.server = ('fakespv.lbry.com', 50001)
|
||||||
|
|
||||||
async def get_balance(*_):
|
async def get_balance(*_):
|
||||||
return balance
|
return balance
|
||||||
|
@ -130,6 +132,7 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
head_blob_duration = event['properties']['head_blob_duration']
|
head_blob_duration = event['properties']['head_blob_duration']
|
||||||
sd_blob_duration = event['properties']['sd_blob_duration']
|
sd_blob_duration = event['properties']['sd_blob_duration']
|
||||||
self.assertFalse(event['properties']['added_fixed_peers'])
|
self.assertFalse(event['properties']['added_fixed_peers'])
|
||||||
|
self.assertEqual(event['properties']['wallet_server'], "fakespv.lbry.com:50001")
|
||||||
self.assertTrue(total_duration >= resolve_duration + head_blob_duration + sd_blob_duration)
|
self.assertTrue(total_duration >= resolve_duration + head_blob_duration + sd_blob_duration)
|
||||||
|
|
||||||
await self._test_time_to_first_bytes(check_post)
|
await self._test_time_to_first_bytes(check_post)
|
||||||
|
@ -172,6 +175,9 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
self.assertIsNone(event['properties']['sd_blob_duration'])
|
self.assertIsNone(event['properties']['sd_blob_duration'])
|
||||||
self.assertFalse(event['properties']['added_fixed_peers'])
|
self.assertFalse(event['properties']['added_fixed_peers'])
|
||||||
self.assertEqual(event['properties']['connection_failures_count'], 1)
|
self.assertEqual(event['properties']['connection_failures_count'], 1)
|
||||||
|
self.assertEqual(
|
||||||
|
event['properties']['error_message'], f'Failed to download sd blob {self.sd_hash} within timeout'
|
||||||
|
)
|
||||||
|
|
||||||
await self._test_time_to_first_bytes(check_post, DownloadSDTimeout, after_setup=after_setup)
|
await self._test_time_to_first_bytes(check_post, DownloadSDTimeout, after_setup=after_setup)
|
||||||
|
|
||||||
|
@ -213,6 +219,9 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
self.assertEqual(event['properties']['use_fixed_peers'], False)
|
self.assertEqual(event['properties']['use_fixed_peers'], False)
|
||||||
self.assertEqual(event['properties']['added_fixed_peers'], False)
|
self.assertEqual(event['properties']['added_fixed_peers'], False)
|
||||||
self.assertEqual(event['properties']['fixed_peer_delay'], None)
|
self.assertEqual(event['properties']['fixed_peer_delay'], None)
|
||||||
|
self.assertEqual(
|
||||||
|
event['properties']['error_message'], f'Failed to download sd blob {self.sd_hash} within timeout'
|
||||||
|
)
|
||||||
|
|
||||||
start = self.loop.time()
|
start = self.loop.time()
|
||||||
await self._test_time_to_first_bytes(check_post, DownloadSDTimeout)
|
await self._test_time_to_first_bytes(check_post, DownloadSDTimeout)
|
||||||
|
@ -272,14 +281,21 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
self.assertListEqual(expected_events, received)
|
self.assertListEqual(expected_events, received)
|
||||||
|
|
||||||
async def _test_download_error_on_start(self, expected_error, timeout=None):
|
async def _test_download_error_on_start(self, expected_error, timeout=None):
|
||||||
with self.assertRaises(expected_error):
|
error = None
|
||||||
|
try:
|
||||||
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager, timeout)
|
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager, timeout)
|
||||||
|
except Exception as err:
|
||||||
|
if isinstance(err, asyncio.CancelledError):
|
||||||
|
raise
|
||||||
|
error = err
|
||||||
|
self.assertEqual(expected_error, type(error))
|
||||||
|
|
||||||
async def _test_download_error_analytics_on_start(self, expected_error, timeout=None):
|
async def _test_download_error_analytics_on_start(self, expected_error, error_message, timeout=None):
|
||||||
received = []
|
received = []
|
||||||
|
|
||||||
async def check_post(event):
|
async def check_post(event):
|
||||||
self.assertEqual("Time To First Bytes", event['event'])
|
self.assertEqual("Time To First Bytes", event['event'])
|
||||||
|
self.assertEqual(event['properties']['error_message'], error_message)
|
||||||
received.append(event['properties']['error'])
|
received.append(event['properties']['error'])
|
||||||
|
|
||||||
self.stream_manager.analytics_manager._post = check_post
|
self.stream_manager.analytics_manager._post = check_post
|
||||||
|
@ -295,7 +311,7 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
'version': '_0_0_1'
|
'version': '_0_0_1'
|
||||||
}
|
}
|
||||||
await self.setup_stream_manager(10.0, fee)
|
await self.setup_stream_manager(10.0, fee)
|
||||||
await self._test_download_error_on_start(InsufficientFundsError)
|
await self._test_download_error_on_start(InsufficientFundsError, "")
|
||||||
|
|
||||||
async def test_fee_above_max_allowed(self):
|
async def test_fee_above_max_allowed(self):
|
||||||
fee = {
|
fee = {
|
||||||
|
@ -305,7 +321,7 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
'version': '_0_0_1'
|
'version': '_0_0_1'
|
||||||
}
|
}
|
||||||
await self.setup_stream_manager(1000000.0, fee)
|
await self.setup_stream_manager(1000000.0, fee)
|
||||||
await self._test_download_error_on_start(KeyFeeAboveMaxAllowed)
|
await self._test_download_error_on_start(KeyFeeAboveMaxAllowed, "")
|
||||||
|
|
||||||
async def test_resolve_error(self):
|
async def test_resolve_error(self):
|
||||||
await self.setup_stream_manager()
|
await self.setup_stream_manager()
|
||||||
|
@ -315,14 +331,27 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
async def test_download_sd_timeout(self):
|
async def test_download_sd_timeout(self):
|
||||||
self.server.stop_server()
|
self.server.stop_server()
|
||||||
await self.setup_stream_manager()
|
await self.setup_stream_manager()
|
||||||
await self._test_download_error_analytics_on_start(DownloadSDTimeout, timeout=1)
|
await self._test_download_error_analytics_on_start(
|
||||||
|
DownloadSDTimeout, f'Failed to download sd blob {self.sd_hash} within timeout', timeout=1
|
||||||
|
)
|
||||||
|
|
||||||
async def test_download_data_timeout(self):
|
async def test_download_data_timeout(self):
|
||||||
await self.setup_stream_manager()
|
await self.setup_stream_manager()
|
||||||
with open(os.path.join(self.server_dir, self.sd_hash), 'r') as sdf:
|
with open(os.path.join(self.server_dir, self.sd_hash), 'r') as sdf:
|
||||||
head_blob_hash = json.loads(sdf.read())['blobs'][0]['blob_hash']
|
head_blob_hash = json.loads(sdf.read())['blobs'][0]['blob_hash']
|
||||||
self.server_blob_manager.delete_blob(head_blob_hash)
|
self.server_blob_manager.delete_blob(head_blob_hash)
|
||||||
await self._test_download_error_analytics_on_start(DownloadDataTimeout, timeout=1)
|
await self._test_download_error_analytics_on_start(
|
||||||
|
DownloadDataTimeout, f'Failed to download data blobs for sd hash {self.sd_hash} within timeout', timeout=1
|
||||||
|
)
|
||||||
|
|
||||||
|
async def test_unexpected_error(self):
|
||||||
|
await self.setup_stream_manager()
|
||||||
|
err_msg = f"invalid blob directory '{self.client_dir}'"
|
||||||
|
shutil.rmtree(self.client_dir)
|
||||||
|
await self._test_download_error_analytics_on_start(
|
||||||
|
OSError, err_msg, timeout=1
|
||||||
|
)
|
||||||
|
os.mkdir(self.client_dir) # so the test cleanup doesn't error
|
||||||
|
|
||||||
async def test_non_head_data_timeout(self):
|
async def test_non_head_data_timeout(self):
|
||||||
await self.setup_stream_manager()
|
await self.setup_stream_manager()
|
||||||
|
|
Loading…
Reference in a new issue