Merge pull request from lbryio/refactor_ttfb

refactor and fix ttfb for new api
This commit is contained in:
Jack Robison 2019-07-28 22:28:02 -04:00 committed by GitHub
commit 726129836a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 163 additions and 149 deletions

View file

@ -1,5 +1,6 @@
import os import os
import time import time
import textwrap
import argparse import argparse
import asyncio import asyncio
import logging import logging
@ -9,7 +10,7 @@ from lbry.wallet.ledger import MainNetLedger
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
log.addHandler(logging.StreamHandler()) log.addHandler(logging.StreamHandler())
log.setLevel(logging.INFO) log.setLevel(logging.CRITICAL)
DEFAULT_ANY_TAGS = [ DEFAULT_ANY_TAGS = [
'blockchain', 'blockchain',
@ -68,43 +69,46 @@ def get_args(limit=20):
args = [] args = []
any_tags_combinations = [DEFAULT_ANY_TAGS, COMMON_AND_RARE, RARE_ANY_TAGS, COMMON_AND_RARE2, CITY_FIX, []] any_tags_combinations = [DEFAULT_ANY_TAGS, COMMON_AND_RARE, RARE_ANY_TAGS, COMMON_AND_RARE2, CITY_FIX, []]
not_tags_combinations = [MATURE_TAGS, []] not_tags_combinations = [MATURE_TAGS, []]
for no_totals in [True]: for no_fee in [False, True]:
for offset in [0, 100]: for claim_type in [None, 'stream', 'channel']:
for any_tags in any_tags_combinations: for no_totals in [True]:
for not_tags in not_tags_combinations: for offset in [0, 100]:
for order_by in ORDER_BY: for any_tags in any_tags_combinations:
kw = { for not_tags in not_tags_combinations:
'order_by': order_by, for order_by in ORDER_BY:
'offset': offset, kw = {
'limit': limit, 'order_by': order_by,
'no_totals': no_totals 'offset': offset,
} 'limit': limit,
if not_tags: 'no_totals': no_totals
kw['not_tags'] = not_tags }
if any_tags: if not_tags:
kw['any_tags'] = any_tags kw['not_tags'] = not_tags
args.append(kw) if any_tags:
print(len(args), "argument combinations") kw['any_tags'] = any_tags
if claim_type:
kw['claim_type'] = claim_type
if no_fee:
kw['fee_amount'] = 0
args.append(kw)
print(f"-- Trying {len(args)} argument combinations")
return args return args
def _search(kwargs): def _search(kwargs):
start = time.time() start = time.perf_counter()
error = None
try: try:
search_to_bytes(kwargs) search_to_bytes(kwargs)
t = time.time() - start
return t, kwargs
except Exception as err: except Exception as err:
return -1, f"failed: error={str(type(err))}({str(err)})" error = str(err)
return time.perf_counter() - start, kwargs, error
async def search(executor, kwargs): async def search(executor, kwargs):
try: return await asyncio.get_running_loop().run_in_executor(
return await asyncio.get_running_loop().run_in_executor( executor, _search, kwargs
executor, _search, kwargs )
)
except Exception as err:
return f"failed (err={str(type(err))}({err}))- {kwargs}"
async def main(db_path, max_query_time): async def main(db_path, max_query_time):
@ -115,23 +119,50 @@ async def main(db_path, max_query_time):
tasks = [search(query_executor, constraints) for constraints in get_args()] tasks = [search(query_executor, constraints) for constraints in get_args()]
try: try:
results = await asyncio.gather(*tasks) results = await asyncio.gather(*tasks)
for ts, constraints in results: query_times = [
if ts >= max_query_time: {
sql = interpolate(*_get_claims(""" 'sql': interpolate(*_get_claims("""
claimtrie.claim_hash as is_controlling, claimtrie.claim_hash as is_controlling,
claimtrie.last_take_over_height, claimtrie.last_take_over_height,
claim.claim_hash, claim.txo_hash, claim.claim_hash, claim.txo_hash,
claim.claims_in_channel, claim.claims_in_channel,
claim.height, claim.creation_height, claim.height, claim.creation_height,
claim.activation_height, claim.expiration_height, claim.activation_height, claim.expiration_height,
claim.effective_amount, claim.support_amount, claim.effective_amount, claim.support_amount,
claim.trending_group, claim.trending_mixed, claim.trending_group, claim.trending_mixed,
claim.trending_local, claim.trending_global, claim.trending_local, claim.trending_global,
claim.short_url, claim.canonical_url, claim.short_url, claim.canonical_url,
claim.channel_hash, channel.txo_hash AS channel_txo_hash, claim.channel_hash, channel.txo_hash AS channel_txo_hash,
channel.height AS channel_height, claim.signature_valid channel.height AS channel_height, claim.signature_valid
""", **constraints)) """, **constraints)),
print(f"Query took {int(ts * 1000)}ms\n{sql}") 'duration': ts,
'error': error
}
for ts, constraints, error in results
]
errored = [query_info for query_info in query_times if query_info['error']]
errors = {str(query_info['error']): [] for query_info in errored}
for error in errored:
errors[str(error['error'])].append(error['sql'])
slow = [
query_info for query_info in query_times
if not query_info['error'] and query_info['duration'] > (max_query_time / 2.0)
]
fast = [
query_info for query_info in query_times
if not query_info['error'] and query_info['duration'] <= (max_query_time / 2.0)
]
print(f"-- {len(fast)} queries were fast")
slow.sort(key=lambda query_info: query_info['duration'], reverse=True)
print(f"-- Failing queries:")
for error in errors:
print(f"-- Failure: \"{error}\"")
for failing_query in errors[error]:
print(f"{textwrap.dedent(failing_query)};\n")
print()
print(f"-- Slow queries:")
for slow_query in slow:
print(f"-- Query took {slow_query['duration']}\n{textwrap.dedent(slow_query['sql'])};\n")
finally: finally:
query_executor.shutdown() query_executor.shutdown()
@ -139,7 +170,7 @@ async def main(db_path, max_query_time):
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--db_path', dest='db_path', default=os.path.expanduser('~/claims.db'), type=str) parser.add_argument('--db_path', dest='db_path', default=os.path.expanduser('~/claims.db'), type=str)
parser.add_argument('--max_time', dest='max_time', default=0.0, type=float) parser.add_argument('--max_time', dest='max_time', default=0.25, type=float)
args = parser.parse_args() args = parser.parse_args()
db_path = args.db_path db_path = args.db_path
max_query_time = args.max_time max_query_time = args.max_time

View file

@ -1,43 +1,19 @@
import os import os
import sys
import json import json
import argparse import argparse
import asyncio import asyncio
import aiohttp
import time import time
import aiohttp
from aiohttp import ClientConnectorError from aiohttp import ClientConnectorError
from lbry import __version__ from lbry import __version__
from lbry.blob.blob_file import MAX_BLOB_SIZE from lbry.blob.blob_file import MAX_BLOB_SIZE
from lbry.conf import Config from lbry.conf import Config
from lbry.schema.uri import parse_lbry_uri
from lbry.extras.daemon.client import daemon_rpc from lbry.extras.daemon.client import daemon_rpc
from lbry.extras import system_info from lbry.extras import system_info
def extract_uris(response):
uris = list()
for key in response:
for value in response[key]:
uris.append(value)
return uris
async def get_frontpage_uris():
session = aiohttp.ClientSession()
try:
response = await session.get("https://api.lbry.com/file/list_homepage", timeout=10.0)
if response.status != 200:
print("API returned non 200 code!!")
return
body = await response.json()
await session.close()
uris = extract_uris(body['data']['Uris'])
return uris
finally:
await session.close()
async def report_to_slack(output, webhook): async def report_to_slack(output, webhook):
payload = { payload = {
"text": f"lbrynet {__version__} ({system_info.get_platform()['platform']}) time to first byte:\n{output}" "text": f"lbrynet {__version__} ({system_info.get_platform()['platform']}) time to first byte:\n{output}"
@ -58,120 +34,127 @@ def variance(times):
return round(sum(((i - mean) ** 2.0 for i in times)) / (len(times) - 1), 3) return round(sum(((i - mean) ** 2.0 for i in times)) / (len(times) - 1), 3)
async def wait_for_done(conf, uri, timeout): async def wait_for_done(conf, claim_name, timeout):
name = uri.split("#")[0] blobs_completed, last_completed = 0, time.perf_counter()
last_complete = 0
hang_count = 0
while True: while True:
files = await daemon_rpc(conf, "file_list", claim_name=name) file = (await daemon_rpc(conf, "file_list", claim_name=claim_name))[0]
file = files[0]
if file['status'] in ['finished', 'stopped']: if file['status'] in ['finished', 'stopped']:
return True, file['blobs_completed'], file['blobs_in_stream'] return True, file['blobs_completed'], file['blobs_in_stream']
if last_complete < int(file['blobs_completed']): elif blobs_completed < int(file['blobs_completed']):
hang_count = 0 blobs_completed, last_completed = int(file['blobs_completed']), time.perf_counter()
last_complete = int(file['blobs_completed']) elif (time.perf_counter() - last_completed) > timeout:
else:
hang_count += 1
await asyncio.sleep(1.0)
if hang_count > timeout:
return False, file['blobs_completed'], file['blobs_in_stream'] return False, file['blobs_completed'], file['blobs_in_stream']
await asyncio.sleep(1.0)
async def main(uris=None, cmd_args=None): async def main(cmd_args=None):
if not uris: print('Time to first byte started using parameters:')
uris = await get_frontpage_uris() for key, value in vars(cmd_args).items():
print(f"{key}: {value}")
conf = Config() conf = Config()
url_to_claim = {}
try: try:
await daemon_rpc(conf, 'status') for page in range(1, cmd_args.download_pages + 1):
start = time.perf_counter()
kwargs = {
'page': page,
# 'claim_type': 'stream',
'order_by': ['trending_global', 'trending_mixed'],
'no_totals': True
}
# if not cmd_args.allow_fees:
# kwargs['fee_amount'] = 0
response = await daemon_rpc(
conf, 'claim_search', **kwargs
)
if 'error' in response or not response.get('items'):
print(f'Error getting claim list page {page}:')
print(response)
return 1
else:
url_to_claim.update({
claim['permanent_url']: claim for claim in response['items']
})
print(f'Claim search page {page} took: {time.perf_counter() - start}')
except (ClientConnectorError, ConnectionError): except (ClientConnectorError, ConnectionError):
print("Could not connect to daemon") print("Could not connect to daemon")
return 1 return 1
print(f"Checking {len(uris)} uris from the front page")
print("**********************************************") print("**********************************************")
print(f"Attempting to download {len(url_to_claim)} claim_search streams")
resolvable = []
async def __resolve(name):
resolved = await daemon_rpc(conf, 'resolve', urls=[name])
if 'error' not in resolved.get(name, {}):
if ("fee" not in resolved[name]['claim']['value']) or cmd_args.allow_fees:
resolvable.append(name)
else:
print(f"{name} has a fee, skipping it")
else:
print(f"failed to resolve {name}: {resolved[name]['error']}")
await asyncio.gather(*(__resolve(name) for name in uris))
print(f"attempting to download {len(resolvable)}/{len(uris)} frontpage streams")
first_byte_times = [] first_byte_times = []
download_speeds = [] download_speeds = []
download_successes = [] download_successes = []
failed_to_start = [] failed_to = {}
download_failures = []
for uri in resolvable: await asyncio.gather(*(
await daemon_rpc(conf, 'file_delete', delete_from_download_dir=True, claim_name=parse_lbry_uri(uri).name) daemon_rpc(conf, 'file_delete', delete_from_download_dir=True, claim_name=claim['name'])
for claim in url_to_claim.values() if not cmd_args.keep_files
))
for i, uri in enumerate(resolvable): for i, (url, claim) in enumerate(url_to_claim.items()):
start = time.time() start = time.perf_counter()
try: response = await daemon_rpc(conf, 'get', uri=url, save_file=not cmd_args.head_blob_only)
await daemon_rpc(conf, 'get', uri=uri, save_file=True) if 'error' in response:
first_byte = time.time() print(f"{i + 1}/{len(url_to_claim)} - failed to start {url}: {response['error']}")
first_byte_times.append(first_byte - start) failed_to[url] = 'start'
print(f"{i + 1}/{len(resolvable)} - {first_byte - start} {uri}")
if not cmd_args.head_blob_only:
downloaded, amount_downloaded, blobs_in_stream = await wait_for_done(
conf, uri, cmd_args.stall_download_timeout
)
if downloaded:
download_successes.append(uri)
else:
download_failures.append(uri)
mbs = round((blobs_in_stream * (MAX_BLOB_SIZE - 1)) / (time.time() - start) / 1000000, 2)
download_speeds.append(mbs)
print(f"downloaded {amount_downloaded}/{blobs_in_stream} blobs for {uri} at "
f"{mbs}mb/s")
except Exception as e:
print(f"{i + 1}/{len(uris)} - failed to start {uri}: {e}")
failed_to_start.append(uri)
if cmd_args.exit_on_error: if cmd_args.exit_on_error:
return return
if cmd_args.delete_after_download or cmd_args.head_blob_only: continue
await daemon_rpc(conf, 'file_delete', delete_from_download_dir=True, claim_name=parse_lbry_uri(uri).name) first_byte = time.perf_counter()
first_byte_times.append(first_byte - start)
print(f"{i + 1}/{len(url_to_claim)} - {first_byte - start} {url}")
if not cmd_args.head_blob_only:
downloaded, amount_downloaded, blobs_in_stream = await wait_for_done(
conf, claim['name'], cmd_args.stall_download_timeout
)
if downloaded:
download_successes.append(url)
else:
failed_to[url] = 'finish'
mbs = round((blobs_in_stream * (MAX_BLOB_SIZE - 1)) / (time.perf_counter() - start) / 1000000, 2)
download_speeds.append(mbs)
print(f"downloaded {amount_downloaded}/{blobs_in_stream} blobs for {url} at "
f"{mbs}mb/s")
if not cmd_args.keep_files:
await daemon_rpc(conf, 'file_delete', delete_from_download_dir=True, claim_name=claim['name'])
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
print("**********************************************") print("**********************************************")
result = f"Started {len(first_byte_times)} of {len(resolvable)} attempted front page streams\n" \ result = f"Started {len(first_byte_times)} of {len(url_to_claim)} attempted front page streams\n"
f"Worst first byte time: {round(max(first_byte_times), 2)}\n" \ if first_byte_times:
f"Best first byte time: {round(min(first_byte_times), 2)}\n" \ result += f"Worst first byte time: {round(max(first_byte_times), 2)}\n" \
f"95% confidence time-to-first-byte: {confidence(first_byte_times, 1.984)}s\n" \ f"Best first byte time: {round(min(first_byte_times), 2)}\n" \
f"99% confidence time-to-first-byte: {confidence(first_byte_times, 2.626)}s\n" \ f"95% confidence time-to-first-byte: {confidence(first_byte_times, 1.984)}s\n" \
f"Variance: {variance(first_byte_times)}\n" f"99% confidence time-to-first-byte: {confidence(first_byte_times, 2.626)}s\n" \
if not cmd_args.head_blob_only: f"Variance: {variance(first_byte_times)}\n"
result += f"Downloaded {len(download_successes)}/{len(resolvable)}\n" \ if download_successes:
result += f"Downloaded {len(download_successes)}/{len(url_to_claim)}\n" \
f"Best stream download speed: {round(max(download_speeds), 2)}mb/s\n" \ f"Best stream download speed: {round(max(download_speeds), 2)}mb/s\n" \
f"Worst stream download speed: {round(min(download_speeds), 2)}mb/s\n" \ f"Worst stream download speed: {round(min(download_speeds), 2)}mb/s\n" \
f"95% confidence download speed: {confidence(download_speeds, 1.984, False)}mb/s\n" \ f"95% confidence download speed: {confidence(download_speeds, 1.984, False)}mb/s\n" \
f"99% confidence download speed: {confidence(download_speeds, 2.626, False)}mb/s\n" f"99% confidence download speed: {confidence(download_speeds, 2.626, False)}mb/s\n"
if failed_to_start: for reason in ('start', 'finish'):
result += "\nFailed to start:" + "\n".join([f for f in failed_to_start]) failures = [url for url, why in failed_to.items() if reason == why]
if download_failures: if failures:
result += "\nFailed to finish:" + "\n".join([f for f in download_failures]) result += f"\nFailed to {reason}:\n" + "\n".join(failures)
print(result) print(result)
webhook = os.environ.get('TTFB_SLACK_TOKEN', None) webhook = os.environ.get('TTFB_SLACK_TOKEN', None)
if webhook: if webhook:
await report_to_slack(result, webhook) await report_to_slack(result, webhook)
return 0
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
#parser.add_argument("--data_dir")
#parser.add_argument("--wallet_dir")
#parser.add_argument("--download_directory")
parser.add_argument("--allow_fees", action='store_true') parser.add_argument("--allow_fees", action='store_true')
parser.add_argument("--exit_on_error", action='store_true') parser.add_argument("--exit_on_error", action='store_true')
parser.add_argument("--stall_download_timeout", default=10, type=int) parser.add_argument("--stall_download_timeout", default=0, type=int)
parser.add_argument("--delete_after_download", action='store_true') parser.add_argument("--keep_files", action='store_true')
parser.add_argument("--head_blob_only", action='store_true') parser.add_argument("--head_blob_only", action='store_true')
asyncio.run(main(cmd_args=parser.parse_args())) parser.add_argument("--download_pages", type=int, default=10)
sys.exit(asyncio.run(main(cmd_args=parser.parse_args())) or 0)