forked from LBRYCommunity/lbry-sdk
Merge pull request #2379 from lbryio/add-wallet-server-to-ttfb-analytics
Add `connected_servers` field to wallet status and improve analytics for wallet related download errors
This commit is contained in:
commit
6fd0c2bf77
5 changed files with 64 additions and 11 deletions
|
@ -239,7 +239,15 @@ class WalletComponent(Component):
|
||||||
local_height = self.wallet_manager.ledger.headers.height
|
local_height = self.wallet_manager.ledger.headers.height
|
||||||
remote_height = self.wallet_manager.ledger.network.remote_height
|
remote_height = self.wallet_manager.ledger.network.remote_height
|
||||||
best_hash = self.wallet_manager.get_best_blockhash()
|
best_hash = self.wallet_manager.get_best_blockhash()
|
||||||
|
server = self.wallet_manager.ledger.network.client.server
|
||||||
return {
|
return {
|
||||||
|
'connected_servers': [
|
||||||
|
{
|
||||||
|
'host': server[0],
|
||||||
|
'port': server[1],
|
||||||
|
'latency': 0 # TODO: use real latency
|
||||||
|
}
|
||||||
|
],
|
||||||
'blocks': max(local_height, 0),
|
'blocks': max(local_height, 0),
|
||||||
'blocks_behind': max(remote_height - local_height, 0),
|
'blocks_behind': max(remote_height - local_height, 0),
|
||||||
'best_blockhash': best_hash,
|
'best_blockhash': best_hash,
|
||||||
|
|
|
@ -747,6 +747,13 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
'best_blockhash': (str) block hash of most recent block,
|
'best_blockhash': (str) block hash of most recent block,
|
||||||
'is_encrypted': (bool),
|
'is_encrypted': (bool),
|
||||||
'is_locked': (bool),
|
'is_locked': (bool),
|
||||||
|
'connected_servers': (list) [
|
||||||
|
{
|
||||||
|
'host': (str) server hostname,
|
||||||
|
'port': (int) server port,
|
||||||
|
'latency': (int) milliseconds
|
||||||
|
}
|
||||||
|
],
|
||||||
},
|
},
|
||||||
'dht': {
|
'dht': {
|
||||||
'node_id': (str) lbry dht node id - hex encoded,
|
'node_id': (str) lbry dht node id - hex encoded,
|
||||||
|
|
|
@ -51,13 +51,15 @@ def _download_properties(conf: Config, external_ip: str, resolve_duration: float
|
||||||
head_blob_hash: typing.Optional[str] = None,
|
head_blob_hash: typing.Optional[str] = None,
|
||||||
head_blob_length: typing.Optional[int] = None,
|
head_blob_length: typing.Optional[int] = None,
|
||||||
head_blob_download_duration: typing.Optional[float] = None,
|
head_blob_download_duration: typing.Optional[float] = None,
|
||||||
error: typing.Optional[str] = None) -> typing.Dict:
|
error: typing.Optional[str] = None, error_msg: typing.Optional[str] = None,
|
||||||
|
wallet_server: typing.Optional[str] = None) -> typing.Dict:
|
||||||
return {
|
return {
|
||||||
"external_ip": external_ip,
|
"external_ip": external_ip,
|
||||||
"download_id": download_id,
|
"download_id": download_id,
|
||||||
"total_duration": round(total_duration, 4),
|
"total_duration": round(total_duration, 4),
|
||||||
"resolve_duration": None if not resolve_duration else round(resolve_duration, 4),
|
"resolve_duration": None if not resolve_duration else round(resolve_duration, 4),
|
||||||
"error": error,
|
"error": error,
|
||||||
|
"error_message": error_msg,
|
||||||
'name': name,
|
'name': name,
|
||||||
"outpoint": outpoint,
|
"outpoint": outpoint,
|
||||||
|
|
||||||
|
@ -77,7 +79,8 @@ def _download_properties(conf: Config, external_ip: str, resolve_duration: float
|
||||||
"head_blob_length": head_blob_length,
|
"head_blob_length": head_blob_length,
|
||||||
"head_blob_duration": None if not head_blob_download_duration else round(head_blob_download_duration, 4),
|
"head_blob_duration": None if not head_blob_download_duration else round(head_blob_download_duration, 4),
|
||||||
|
|
||||||
"connection_failures_count": connection_failures_count
|
"connection_failures_count": connection_failures_count,
|
||||||
|
"wallet_server": wallet_server
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -184,11 +187,14 @@ class AnalyticsManager:
|
||||||
head_blob_hash: typing.Optional[str] = None,
|
head_blob_hash: typing.Optional[str] = None,
|
||||||
head_blob_length: typing.Optional[int] = None,
|
head_blob_length: typing.Optional[int] = None,
|
||||||
head_blob_duration: typing.Optional[int] = None,
|
head_blob_duration: typing.Optional[int] = None,
|
||||||
error: typing.Optional[str] = None):
|
error: typing.Optional[str] = None,
|
||||||
|
error_msg: typing.Optional[str] = None,
|
||||||
|
wallet_server: typing.Optional[str] = None):
|
||||||
await self.track(self._event(TIME_TO_FIRST_BYTES, _download_properties(
|
await self.track(self._event(TIME_TO_FIRST_BYTES, _download_properties(
|
||||||
self.conf, self.external_ip, resolve_duration, total_duration, download_id, name, outpoint,
|
self.conf, self.external_ip, resolve_duration, total_duration, download_id, name, outpoint,
|
||||||
found_peers_count, tried_peers_count, connection_failures_count, added_fixed_peers, fixed_peers_delay,
|
found_peers_count, tried_peers_count, connection_failures_count, added_fixed_peers, fixed_peers_delay,
|
||||||
sd_hash, sd_download_duration, head_blob_hash, head_blob_length, head_blob_duration, error
|
sd_hash, sd_download_duration, head_blob_hash, head_blob_length, head_blob_duration, error, error_msg,
|
||||||
|
wallet_server
|
||||||
)))
|
)))
|
||||||
|
|
||||||
async def send_download_finished(self, download_id, name, sd_hash):
|
async def send_download_finished(self, download_id, name, sd_hash):
|
||||||
|
|
|
@ -448,6 +448,7 @@ class StreamManager:
|
||||||
finally:
|
finally:
|
||||||
if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or
|
if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or
|
||||||
stream.downloader.time_to_first_bytes))):
|
stream.downloader.time_to_first_bytes))):
|
||||||
|
server = self.wallet.ledger.network.client.server
|
||||||
self.loop.create_task(
|
self.loop.create_task(
|
||||||
self.analytics_manager.send_time_to_first_bytes(
|
self.analytics_manager.send_time_to_first_bytes(
|
||||||
resolved_time, self.loop.time() - start_time, None if not stream else stream.download_id,
|
resolved_time, self.loop.time() - start_time, None if not stream else stream.download_id,
|
||||||
|
@ -462,7 +463,9 @@ class StreamManager:
|
||||||
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash,
|
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash,
|
||||||
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length,
|
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length,
|
||||||
None if not stream else stream.downloader.time_to_first_bytes,
|
None if not stream else stream.downloader.time_to_first_bytes,
|
||||||
None if not error else error.__class__.__name__
|
None if not error else error.__class__.__name__,
|
||||||
|
None if not error else str(error),
|
||||||
|
None if not server else f"{server[0]}:{server[1]}"
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
import os
|
import os
|
||||||
|
import shutil
|
||||||
import binascii
|
import binascii
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
import asyncio
|
import asyncio
|
||||||
|
@ -77,6 +78,7 @@ def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None):
|
||||||
|
|
||||||
mock_wallet = mock.Mock(spec=LbryWalletManager)
|
mock_wallet = mock.Mock(spec=LbryWalletManager)
|
||||||
mock_wallet.ledger.resolve = mock_resolve
|
mock_wallet.ledger.resolve = mock_resolve
|
||||||
|
mock_wallet.ledger.network.client.server = ('fakespv.lbry.com', 50001)
|
||||||
|
|
||||||
async def get_balance(*_):
|
async def get_balance(*_):
|
||||||
return balance
|
return balance
|
||||||
|
@ -130,6 +132,7 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
head_blob_duration = event['properties']['head_blob_duration']
|
head_blob_duration = event['properties']['head_blob_duration']
|
||||||
sd_blob_duration = event['properties']['sd_blob_duration']
|
sd_blob_duration = event['properties']['sd_blob_duration']
|
||||||
self.assertFalse(event['properties']['added_fixed_peers'])
|
self.assertFalse(event['properties']['added_fixed_peers'])
|
||||||
|
self.assertEqual(event['properties']['wallet_server'], "fakespv.lbry.com:50001")
|
||||||
self.assertTrue(total_duration >= resolve_duration + head_blob_duration + sd_blob_duration)
|
self.assertTrue(total_duration >= resolve_duration + head_blob_duration + sd_blob_duration)
|
||||||
|
|
||||||
await self._test_time_to_first_bytes(check_post)
|
await self._test_time_to_first_bytes(check_post)
|
||||||
|
@ -172,6 +175,9 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
self.assertIsNone(event['properties']['sd_blob_duration'])
|
self.assertIsNone(event['properties']['sd_blob_duration'])
|
||||||
self.assertFalse(event['properties']['added_fixed_peers'])
|
self.assertFalse(event['properties']['added_fixed_peers'])
|
||||||
self.assertEqual(event['properties']['connection_failures_count'], 1)
|
self.assertEqual(event['properties']['connection_failures_count'], 1)
|
||||||
|
self.assertEqual(
|
||||||
|
event['properties']['error_message'], f'Failed to download sd blob {self.sd_hash} within timeout'
|
||||||
|
)
|
||||||
|
|
||||||
await self._test_time_to_first_bytes(check_post, DownloadSDTimeout, after_setup=after_setup)
|
await self._test_time_to_first_bytes(check_post, DownloadSDTimeout, after_setup=after_setup)
|
||||||
|
|
||||||
|
@ -213,6 +219,9 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
self.assertEqual(event['properties']['use_fixed_peers'], False)
|
self.assertEqual(event['properties']['use_fixed_peers'], False)
|
||||||
self.assertEqual(event['properties']['added_fixed_peers'], False)
|
self.assertEqual(event['properties']['added_fixed_peers'], False)
|
||||||
self.assertEqual(event['properties']['fixed_peer_delay'], None)
|
self.assertEqual(event['properties']['fixed_peer_delay'], None)
|
||||||
|
self.assertEqual(
|
||||||
|
event['properties']['error_message'], f'Failed to download sd blob {self.sd_hash} within timeout'
|
||||||
|
)
|
||||||
|
|
||||||
start = self.loop.time()
|
start = self.loop.time()
|
||||||
await self._test_time_to_first_bytes(check_post, DownloadSDTimeout)
|
await self._test_time_to_first_bytes(check_post, DownloadSDTimeout)
|
||||||
|
@ -272,14 +281,21 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
self.assertListEqual(expected_events, received)
|
self.assertListEqual(expected_events, received)
|
||||||
|
|
||||||
async def _test_download_error_on_start(self, expected_error, timeout=None):
|
async def _test_download_error_on_start(self, expected_error, timeout=None):
|
||||||
with self.assertRaises(expected_error):
|
error = None
|
||||||
|
try:
|
||||||
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager, timeout)
|
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager, timeout)
|
||||||
|
except Exception as err:
|
||||||
|
if isinstance(err, asyncio.CancelledError):
|
||||||
|
raise
|
||||||
|
error = err
|
||||||
|
self.assertEqual(expected_error, type(error))
|
||||||
|
|
||||||
async def _test_download_error_analytics_on_start(self, expected_error, timeout=None):
|
async def _test_download_error_analytics_on_start(self, expected_error, error_message, timeout=None):
|
||||||
received = []
|
received = []
|
||||||
|
|
||||||
async def check_post(event):
|
async def check_post(event):
|
||||||
self.assertEqual("Time To First Bytes", event['event'])
|
self.assertEqual("Time To First Bytes", event['event'])
|
||||||
|
self.assertEqual(event['properties']['error_message'], error_message)
|
||||||
received.append(event['properties']['error'])
|
received.append(event['properties']['error'])
|
||||||
|
|
||||||
self.stream_manager.analytics_manager._post = check_post
|
self.stream_manager.analytics_manager._post = check_post
|
||||||
|
@ -295,7 +311,7 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
'version': '_0_0_1'
|
'version': '_0_0_1'
|
||||||
}
|
}
|
||||||
await self.setup_stream_manager(10.0, fee)
|
await self.setup_stream_manager(10.0, fee)
|
||||||
await self._test_download_error_on_start(InsufficientFundsError)
|
await self._test_download_error_on_start(InsufficientFundsError, "")
|
||||||
|
|
||||||
async def test_fee_above_max_allowed(self):
|
async def test_fee_above_max_allowed(self):
|
||||||
fee = {
|
fee = {
|
||||||
|
@ -305,7 +321,7 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
'version': '_0_0_1'
|
'version': '_0_0_1'
|
||||||
}
|
}
|
||||||
await self.setup_stream_manager(1000000.0, fee)
|
await self.setup_stream_manager(1000000.0, fee)
|
||||||
await self._test_download_error_on_start(KeyFeeAboveMaxAllowed)
|
await self._test_download_error_on_start(KeyFeeAboveMaxAllowed, "")
|
||||||
|
|
||||||
async def test_resolve_error(self):
|
async def test_resolve_error(self):
|
||||||
await self.setup_stream_manager()
|
await self.setup_stream_manager()
|
||||||
|
@ -315,14 +331,27 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
async def test_download_sd_timeout(self):
|
async def test_download_sd_timeout(self):
|
||||||
self.server.stop_server()
|
self.server.stop_server()
|
||||||
await self.setup_stream_manager()
|
await self.setup_stream_manager()
|
||||||
await self._test_download_error_analytics_on_start(DownloadSDTimeout, timeout=1)
|
await self._test_download_error_analytics_on_start(
|
||||||
|
DownloadSDTimeout, f'Failed to download sd blob {self.sd_hash} within timeout', timeout=1
|
||||||
|
)
|
||||||
|
|
||||||
async def test_download_data_timeout(self):
|
async def test_download_data_timeout(self):
|
||||||
await self.setup_stream_manager()
|
await self.setup_stream_manager()
|
||||||
with open(os.path.join(self.server_dir, self.sd_hash), 'r') as sdf:
|
with open(os.path.join(self.server_dir, self.sd_hash), 'r') as sdf:
|
||||||
head_blob_hash = json.loads(sdf.read())['blobs'][0]['blob_hash']
|
head_blob_hash = json.loads(sdf.read())['blobs'][0]['blob_hash']
|
||||||
self.server_blob_manager.delete_blob(head_blob_hash)
|
self.server_blob_manager.delete_blob(head_blob_hash)
|
||||||
await self._test_download_error_analytics_on_start(DownloadDataTimeout, timeout=1)
|
await self._test_download_error_analytics_on_start(
|
||||||
|
DownloadDataTimeout, f'Failed to download data blobs for sd hash {self.sd_hash} within timeout', timeout=1
|
||||||
|
)
|
||||||
|
|
||||||
|
async def test_unexpected_error(self):
|
||||||
|
await self.setup_stream_manager()
|
||||||
|
err_msg = f"invalid blob directory '{self.client_dir}'"
|
||||||
|
shutil.rmtree(self.client_dir)
|
||||||
|
await self._test_download_error_analytics_on_start(
|
||||||
|
OSError, err_msg, timeout=1
|
||||||
|
)
|
||||||
|
os.mkdir(self.client_dir) # so the test cleanup doesn't error
|
||||||
|
|
||||||
async def test_non_head_data_timeout(self):
|
async def test_non_head_data_timeout(self):
|
||||||
await self.setup_stream_manager()
|
await self.setup_stream_manager()
|
||||||
|
|
Loading…
Add table
Reference in a new issue