From 0f8cf466265d5b8d4d8ed8dcc87f9c3ca1186c8d Mon Sep 17 00:00:00 2001 From: Victor Shyba <victor1984@riseup.net> Date: Tue, 16 Jul 2019 23:32:05 -0300 Subject: [PATCH 1/5] refactor and fix ttfb for new api --- lbry/scripts/time_to_first_byte.py | 177 +++++++++++++---------------- 1 file changed, 76 insertions(+), 101 deletions(-) diff --git a/lbry/scripts/time_to_first_byte.py b/lbry/scripts/time_to_first_byte.py index 9abb9fdce..7835ff6a6 100644 --- a/lbry/scripts/time_to_first_byte.py +++ b/lbry/scripts/time_to_first_byte.py @@ -1,43 +1,19 @@ import os +import sys import json import argparse import asyncio -import aiohttp import time +import aiohttp from aiohttp import ClientConnectorError from lbry import __version__ from lbry.blob.blob_file import MAX_BLOB_SIZE from lbry.conf import Config -from lbry.schema.uri import parse_lbry_uri from lbry.extras.daemon.client import daemon_rpc from lbry.extras import system_info -def extract_uris(response): - uris = list() - for key in response: - for value in response[key]: - uris.append(value) - - return uris - - -async def get_frontpage_uris(): - session = aiohttp.ClientSession() - try: - response = await session.get("https://api.lbry.com/file/list_homepage", timeout=10.0) - if response.status != 200: - print("API returned non 200 code!!") - return - body = await response.json() - await session.close() - uris = extract_uris(body['data']['Uris']) - return uris - finally: - await session.close() - - async def report_to_slack(output, webhook): payload = { "text": f"lbrynet {__version__} ({system_info.get_platform()['platform']}) time to first byte:\n{output}" @@ -58,120 +34,119 @@ def variance(times): return round(sum(((i - mean) ** 2.0 for i in times)) / (len(times) - 1), 3) -async def wait_for_done(conf, uri, timeout): - name = uri.split("#")[0] - last_complete = 0 - hang_count = 0 +async def wait_for_done(conf, claim_name, timeout): + blobs_completed, last_completed = 0, time.time() while True: - files = await daemon_rpc(conf, "file_list", claim_name=name) - file = files[0] + file = (await daemon_rpc(conf, "file_list", claim_name=claim_name))[0] if file['status'] in ['finished', 'stopped']: return True, file['blobs_completed'], file['blobs_in_stream'] - if last_complete < int(file['blobs_completed']): - hang_count = 0 - last_complete = int(file['blobs_completed']) - else: - hang_count += 1 - await asyncio.sleep(1.0) - if hang_count > timeout: + elif blobs_completed < int(file['blobs_completed']): + blobs_completed, last_completed = int(file['blobs_completed']), time.time() + elif (time.time() - last_completed) > timeout: return False, file['blobs_completed'], file['blobs_in_stream'] + await asyncio.sleep(1.0) -async def main(uris=None, cmd_args=None): - if not uris: - uris = await get_frontpage_uris() +async def main(cmd_args=None): + print('Time to first byte started using parameters:') + for key, value in vars(cmd_args).items(): + print(f"{key}: {value}") conf = Config() + url_to_claim = {} try: - await daemon_rpc(conf, 'status') + for page in range(1, cmd_args.download_pages + 1): + start = time.time() + response = await daemon_rpc( + conf, 'claim_search', page=page, claim_type='stream', fee_amount=None if cmd_args.allow_fees else 0, + order_by=['trending_global'], no_totals=True + ) + if 'error' in response or not response.get('items'): + print(f'Error getting claim list page {page}:') + print(response) + return 1 + else: + url_to_claim.update({ + claim['permanent_url']: claim for claim in response['items'] + }) + print(f'Claim search page {page} took: {time.time() - start}') except (ClientConnectorError, ConnectionError): print("Could not connect to daemon") return 1 - print(f"Checking {len(uris)} uris from the front page") print("**********************************************") - resolvable = [] - async def __resolve(name): - resolved = await daemon_rpc(conf, 'resolve', urls=[name]) - if 'error' not in resolved.get(name, {}): - if ("fee" not in resolved[name]['claim']['value']) or cmd_args.allow_fees: - resolvable.append(name) - else: - print(f"{name} has a fee, skipping it") - else: - print(f"failed to resolve {name}: {resolved[name]['error']}") - await asyncio.gather(*(__resolve(name) for name in uris)) - print(f"attempting to download {len(resolvable)}/{len(uris)} frontpage streams") + print(f"Attempting to download {len(url_to_claim)} claim_search streams") first_byte_times = [] download_speeds = [] download_successes = [] - failed_to_start = [] - download_failures = [] + failed_to = {} - for uri in resolvable: - await daemon_rpc(conf, 'file_delete', delete_from_download_dir=True, claim_name=parse_lbry_uri(uri).name) + await asyncio.gather(*( + daemon_rpc(conf, 'file_delete', delete_from_download_dir=True, claim_name=claim['name']) + for claim in url_to_claim.values() if not cmd_args.keep_files + )) - for i, uri in enumerate(resolvable): + for i, (url, claim) in enumerate(url_to_claim.items()): start = time.time() - try: - await daemon_rpc(conf, 'get', uri=uri, save_file=True) - first_byte = time.time() - first_byte_times.append(first_byte - start) - print(f"{i + 1}/{len(resolvable)} - {first_byte - start} {uri}") - if not cmd_args.head_blob_only: - downloaded, amount_downloaded, blobs_in_stream = await wait_for_done( - conf, uri, cmd_args.stall_download_timeout - ) - if downloaded: - download_successes.append(uri) - else: - download_failures.append(uri) - mbs = round((blobs_in_stream * (MAX_BLOB_SIZE - 1)) / (time.time() - start) / 1000000, 2) - download_speeds.append(mbs) - print(f"downloaded {amount_downloaded}/{blobs_in_stream} blobs for {uri} at " - f"{mbs}mb/s") - except Exception as e: - print(f"{i + 1}/{len(uris)} - failed to start {uri}: {e}") - failed_to_start.append(uri) + response = await daemon_rpc(conf, 'get', uri=url, save_file=not cmd_args.head_blob_only) + if 'error' in response: + print(f"{i + 1}/{len(url_to_claim)} - failed to start {url}: {response['error']}") + failed_to[url] = 'start' if cmd_args.exit_on_error: return - if cmd_args.delete_after_download or cmd_args.head_blob_only: - await daemon_rpc(conf, 'file_delete', delete_from_download_dir=True, claim_name=parse_lbry_uri(uri).name) + continue + first_byte = time.time() + first_byte_times.append(first_byte - start) + print(f"{i + 1}/{len(url_to_claim)} - {first_byte - start} {url}") + if not cmd_args.head_blob_only: + downloaded, amount_downloaded, blobs_in_stream = await wait_for_done( + conf, claim['name'], cmd_args.stall_download_timeout + ) + if downloaded: + download_successes.append(url) + else: + failed_to[url] = 'finish' + mbs = round((blobs_in_stream * (MAX_BLOB_SIZE - 1)) / (time.time() - start) / 1000000, 2) + download_speeds.append(mbs) + print(f"downloaded {amount_downloaded}/{blobs_in_stream} blobs for {url} at " + f"{mbs}mb/s") + if not cmd_args.keep_files: + await daemon_rpc(conf, 'file_delete', delete_from_download_dir=True, claim_name=claim['name']) await asyncio.sleep(0.1) print("**********************************************") - result = f"Started {len(first_byte_times)} of {len(resolvable)} attempted front page streams\n" \ - f"Worst first byte time: {round(max(first_byte_times), 2)}\n" \ - f"Best first byte time: {round(min(first_byte_times), 2)}\n" \ - f"95% confidence time-to-first-byte: {confidence(first_byte_times, 1.984)}s\n" \ - f"99% confidence time-to-first-byte: {confidence(first_byte_times, 2.626)}s\n" \ - f"Variance: {variance(first_byte_times)}\n" - if not cmd_args.head_blob_only: - result += f"Downloaded {len(download_successes)}/{len(resolvable)}\n" \ + result = f"Started {len(first_byte_times)} of {len(url_to_claim)} attempted front page streams\n" + if first_byte_times: + result += f"Worst first byte time: {round(max(first_byte_times), 2)}\n" \ + f"Best first byte time: {round(min(first_byte_times), 2)}\n" \ + f"95% confidence time-to-first-byte: {confidence(first_byte_times, 1.984)}s\n" \ + f"99% confidence time-to-first-byte: {confidence(first_byte_times, 2.626)}s\n" \ + f"Variance: {variance(first_byte_times)}\n" + if download_successes: + result += f"Downloaded {len(download_successes)}/{len(url_to_claim)}\n" \ f"Best stream download speed: {round(max(download_speeds), 2)}mb/s\n" \ f"Worst stream download speed: {round(min(download_speeds), 2)}mb/s\n" \ f"95% confidence download speed: {confidence(download_speeds, 1.984, False)}mb/s\n" \ f"99% confidence download speed: {confidence(download_speeds, 2.626, False)}mb/s\n" - if failed_to_start: - result += "\nFailed to start:" + "\n".join([f for f in failed_to_start]) - if download_failures: - result += "\nFailed to finish:" + "\n".join([f for f in download_failures]) + for reason in ('start', 'finish'): + failures = [url for url, why in failed_to.items() if reason == why] + if failures: + result += f"\nFailed to {reason}:\n" + "\n".join(failures) print(result) webhook = os.environ.get('TTFB_SLACK_TOKEN', None) if webhook: await report_to_slack(result, webhook) + return 0 if __name__ == "__main__": parser = argparse.ArgumentParser() - #parser.add_argument("--data_dir") - #parser.add_argument("--wallet_dir") - #parser.add_argument("--download_directory") parser.add_argument("--allow_fees", action='store_true') parser.add_argument("--exit_on_error", action='store_true') - parser.add_argument("--stall_download_timeout", default=10, type=int) - parser.add_argument("--delete_after_download", action='store_true') + parser.add_argument("--stall_download_timeout", default=0, type=int) + parser.add_argument("--keep_files", action='store_true') parser.add_argument("--head_blob_only", action='store_true') - asyncio.run(main(cmd_args=parser.parse_args())) + parser.add_argument("--download_pages", type=int, default=10) + sys.exit(asyncio.run(main(cmd_args=parser.parse_args())) or 0) From a08232f1643c7f5fb252963c6669c1fad71a0dad Mon Sep 17 00:00:00 2001 From: Jack Robison <jackrobison@lbry.io> Date: Sun, 28 Jul 2019 21:18:13 -0400 Subject: [PATCH 2/5] fix claim search kwargs in time to first byte script --- lbry/scripts/time_to_first_byte.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/lbry/scripts/time_to_first_byte.py b/lbry/scripts/time_to_first_byte.py index 7835ff6a6..4c921b7c1 100644 --- a/lbry/scripts/time_to_first_byte.py +++ b/lbry/scripts/time_to_first_byte.py @@ -55,10 +55,19 @@ async def main(cmd_args=None): url_to_claim = {} try: for page in range(1, cmd_args.download_pages + 1): - start = time.time() + start = time.perf_counter() + kwargs = { + 'page': page, + # 'claim_type': 'stream', + 'order_by': ['trending_global'], + 'no_totals': True + } + + # if not cmd_args.allow_fees: + # kwargs['fee_amount'] = 0 + response = await daemon_rpc( - conf, 'claim_search', page=page, claim_type='stream', fee_amount=None if cmd_args.allow_fees else 0, - order_by=['trending_global'], no_totals=True + conf, 'claim_search', **kwargs ) if 'error' in response or not response.get('items'): print(f'Error getting claim list page {page}:') From c876d891fad51c9826ab91beaaff08c86bd2bd04 Mon Sep 17 00:00:00 2001 From: Jack Robison <jackrobison@lbry.io> Date: Sun, 28 Jul 2019 21:18:29 -0400 Subject: [PATCH 3/5] use perf_counter --- lbry/scripts/time_to_first_byte.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/lbry/scripts/time_to_first_byte.py b/lbry/scripts/time_to_first_byte.py index 4c921b7c1..0aea73764 100644 --- a/lbry/scripts/time_to_first_byte.py +++ b/lbry/scripts/time_to_first_byte.py @@ -35,14 +35,14 @@ def variance(times): async def wait_for_done(conf, claim_name, timeout): - blobs_completed, last_completed = 0, time.time() + blobs_completed, last_completed = 0, time.perf_counter() while True: file = (await daemon_rpc(conf, "file_list", claim_name=claim_name))[0] if file['status'] in ['finished', 'stopped']: return True, file['blobs_completed'], file['blobs_in_stream'] elif blobs_completed < int(file['blobs_completed']): - blobs_completed, last_completed = int(file['blobs_completed']), time.time() - elif (time.time() - last_completed) > timeout: + blobs_completed, last_completed = int(file['blobs_completed']), time.perf_counter() + elif (time.perf_counter() - last_completed) > timeout: return False, file['blobs_completed'], file['blobs_in_stream'] await asyncio.sleep(1.0) @@ -77,12 +77,11 @@ async def main(cmd_args=None): url_to_claim.update({ claim['permanent_url']: claim for claim in response['items'] }) - print(f'Claim search page {page} took: {time.time() - start}') + print(f'Claim search page {page} took: {time.perf_counter() - start}') except (ClientConnectorError, ConnectionError): print("Could not connect to daemon") return 1 print("**********************************************") - print(f"Attempting to download {len(url_to_claim)} claim_search streams") first_byte_times = [] @@ -96,7 +95,7 @@ async def main(cmd_args=None): )) for i, (url, claim) in enumerate(url_to_claim.items()): - start = time.time() + start = time.perf_counter() response = await daemon_rpc(conf, 'get', uri=url, save_file=not cmd_args.head_blob_only) if 'error' in response: print(f"{i + 1}/{len(url_to_claim)} - failed to start {url}: {response['error']}") @@ -104,7 +103,7 @@ async def main(cmd_args=None): if cmd_args.exit_on_error: return continue - first_byte = time.time() + first_byte = time.perf_counter() first_byte_times.append(first_byte - start) print(f"{i + 1}/{len(url_to_claim)} - {first_byte - start} {url}") if not cmd_args.head_blob_only: @@ -115,7 +114,7 @@ async def main(cmd_args=None): download_successes.append(url) else: failed_to[url] = 'finish' - mbs = round((blobs_in_stream * (MAX_BLOB_SIZE - 1)) / (time.time() - start) / 1000000, 2) + mbs = round((blobs_in_stream * (MAX_BLOB_SIZE - 1)) / (time.perf_counter() - start) / 1000000, 2) download_speeds.append(mbs) print(f"downloaded {amount_downloaded}/{blobs_in_stream} blobs for {url} at " f"{mbs}mb/s") From 1a0680ead9c111e141c70c379bcab91113e73480 Mon Sep 17 00:00:00 2001 From: Jack Robison <jackrobison@lbry.io> Date: Sun, 28 Jul 2019 21:22:07 -0400 Subject: [PATCH 4/5] update claim_search_performance.py, add new test cases shown by time_to_first_byte.py --- lbry/scripts/claim_search_performance.py | 123 ++++++++++++++--------- 1 file changed, 77 insertions(+), 46 deletions(-) diff --git a/lbry/scripts/claim_search_performance.py b/lbry/scripts/claim_search_performance.py index 88c03d373..21af64743 100644 --- a/lbry/scripts/claim_search_performance.py +++ b/lbry/scripts/claim_search_performance.py @@ -1,5 +1,6 @@ import os import time +import textwrap import argparse import asyncio import logging @@ -9,7 +10,7 @@ from lbry.wallet.ledger import MainNetLedger log = logging.getLogger(__name__) log.addHandler(logging.StreamHandler()) -log.setLevel(logging.INFO) +log.setLevel(logging.CRITICAL) DEFAULT_ANY_TAGS = [ 'blockchain', @@ -68,43 +69,46 @@ def get_args(limit=20): args = [] any_tags_combinations = [DEFAULT_ANY_TAGS, COMMON_AND_RARE, RARE_ANY_TAGS, COMMON_AND_RARE2, CITY_FIX, []] not_tags_combinations = [MATURE_TAGS, []] - for no_totals in [True]: - for offset in [0, 100]: - for any_tags in any_tags_combinations: - for not_tags in not_tags_combinations: - for order_by in ORDER_BY: - kw = { - 'order_by': order_by, - 'offset': offset, - 'limit': limit, - 'no_totals': no_totals - } - if not_tags: - kw['not_tags'] = not_tags - if any_tags: - kw['any_tags'] = any_tags - args.append(kw) - print(len(args), "argument combinations") + for no_fee in [False, True]: + for claim_type in [None, 'stream', 'channel']: + for no_totals in [True]: + for offset in [0, 100]: + for any_tags in any_tags_combinations: + for not_tags in not_tags_combinations: + for order_by in ORDER_BY: + kw = { + 'order_by': order_by, + 'offset': offset, + 'limit': limit, + 'no_totals': no_totals + } + if not_tags: + kw['not_tags'] = not_tags + if any_tags: + kw['any_tags'] = any_tags + if claim_type: + kw['claim_type'] = claim_type + if no_fee: + kw['fee_amount'] = 0 + args.append(kw) + print(f"-- Trying {len(args)} argument combinations") return args def _search(kwargs): - start = time.time() + start = time.perf_counter() + error = None try: search_to_bytes(kwargs) - t = time.time() - start - return t, kwargs except Exception as err: - return -1, f"failed: error={str(type(err))}({str(err)})" + error = str(err) + return time.perf_counter() - start, kwargs, error async def search(executor, kwargs): - try: - return await asyncio.get_running_loop().run_in_executor( - executor, _search, kwargs - ) - except Exception as err: - return f"failed (err={str(type(err))}({err}))- {kwargs}" + return await asyncio.get_running_loop().run_in_executor( + executor, _search, kwargs + ) async def main(db_path, max_query_time): @@ -115,23 +119,50 @@ async def main(db_path, max_query_time): tasks = [search(query_executor, constraints) for constraints in get_args()] try: results = await asyncio.gather(*tasks) - for ts, constraints in results: - if ts >= max_query_time: - sql = interpolate(*_get_claims(""" - claimtrie.claim_hash as is_controlling, - claimtrie.last_take_over_height, - claim.claim_hash, claim.txo_hash, - claim.claims_in_channel, - claim.height, claim.creation_height, - claim.activation_height, claim.expiration_height, - claim.effective_amount, claim.support_amount, - claim.trending_group, claim.trending_mixed, - claim.trending_local, claim.trending_global, - claim.short_url, claim.canonical_url, - claim.channel_hash, channel.txo_hash AS channel_txo_hash, - channel.height AS channel_height, claim.signature_valid - """, **constraints)) - print(f"Query took {int(ts * 1000)}ms\n{sql}") + query_times = [ + { + 'sql': interpolate(*_get_claims(""" + claimtrie.claim_hash as is_controlling, + claimtrie.last_take_over_height, + claim.claim_hash, claim.txo_hash, + claim.claims_in_channel, + claim.height, claim.creation_height, + claim.activation_height, claim.expiration_height, + claim.effective_amount, claim.support_amount, + claim.trending_group, claim.trending_mixed, + claim.trending_local, claim.trending_global, + claim.short_url, claim.canonical_url, + claim.channel_hash, channel.txo_hash AS channel_txo_hash, + channel.height AS channel_height, claim.signature_valid + """, **constraints)), + 'duration': ts, + 'error': error + } + for ts, constraints, error in results + ] + errored = [query_info for query_info in query_times if query_info['error']] + errors = {str(query_info['error']): [] for query_info in errored} + for error in errored: + errors[str(error['error'])].append(error['sql']) + slow = [ + query_info for query_info in query_times + if not query_info['error'] and query_info['duration'] > (max_query_time / 2.0) + ] + fast = [ + query_info for query_info in query_times + if not query_info['error'] and query_info['duration'] <= (max_query_time / 2.0) + ] + print(f"-- {len(fast)} queries were fast") + slow.sort(key=lambda query_info: query_info['duration'], reverse=True) + print(f"-- Failing queries:") + for error in errors: + print(f"-- Failure: \"{error}\"") + for failing_query in errors[error]: + print(f"{textwrap.dedent(failing_query)};\n") + print() + print(f"-- Slow queries:") + for slow_query in slow: + print(f"-- Query took {slow_query['duration']}\n{textwrap.dedent(slow_query['sql'])};\n") finally: query_executor.shutdown() @@ -139,7 +170,7 @@ async def main(db_path, max_query_time): if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('--db_path', dest='db_path', default=os.path.expanduser('~/claims.db'), type=str) - parser.add_argument('--max_time', dest='max_time', default=0.0, type=float) + parser.add_argument('--max_time', dest='max_time', default=0.25, type=float) args = parser.parse_args() db_path = args.db_path max_query_time = args.max_time From 356b3c85e563bb5a9614ce50ca51cd5e2889d188 Mon Sep 17 00:00:00 2001 From: Jack Robison <jackrobison@lbry.io> Date: Sun, 28 Jul 2019 21:42:04 -0400 Subject: [PATCH 5/5] use app default order_by --- lbry/scripts/time_to_first_byte.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbry/scripts/time_to_first_byte.py b/lbry/scripts/time_to_first_byte.py index 0aea73764..12b492de4 100644 --- a/lbry/scripts/time_to_first_byte.py +++ b/lbry/scripts/time_to_first_byte.py @@ -59,7 +59,7 @@ async def main(cmd_args=None): kwargs = { 'page': page, # 'claim_type': 'stream', - 'order_by': ['trending_global'], + 'order_by': ['trending_global', 'trending_mixed'], 'no_totals': True }